Skip to content

RabbitResponse

faststream.rabbit.response.RabbitResponse #

RabbitResponse(
    body,
    *,
    timeout=None,
    mandatory=True,
    immediate=False,
    **message_options,
)

Bases: Response

Source code in faststream/rabbit/response.py
def __init__(
    self,
    body: "AioPikaSendableMessage",
    *,
    timeout: "TimeoutType" = None,
    mandatory: bool = True,
    immediate: bool = False,
    **message_options: Unpack["MessageOptions"],
) -> None:
    headers = message_options.pop("headers", {})
    correlation_id = message_options.pop("correlation_id", None)

    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.message_options: BasicMessageOptions = message_options
    self.publish_options: PublishOptions = {
        "mandatory": mandatory,
        "immediate": immediate,
        "timeout": timeout,
    }

message_options instance-attribute #

message_options = message_options

publish_options instance-attribute #

publish_options = {
    "mandatory": mandatory,
    "immediate": immediate,
    "timeout": timeout,
}

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

as_publish_command #

as_publish_command()
Source code in faststream/rabbit/response.py
@override
def as_publish_command(self) -> "RabbitPublishCommand":
    return RabbitPublishCommand(
        message=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
        routing_key="",
        **self.publish_options,
        **self.message_options,
    )