Skip to content

RabbitResponse

faststream.rabbit.response.RabbitResponse #

RabbitResponse(
    body: AioPikaSendableMessage,
    *,
    timeout: TimeoutType = None,
    mandatory: bool = True,
    immediate: bool = False,
    exchange: RabbitExchange | str | None = None,
    **message_options: Unpack[MessageOptions],
)

Bases: Response

Source code in faststream/rabbit/response.py
def __init__(
    self,
    body: "AioPikaSendableMessage",
    *,
    timeout: "TimeoutType" = None,
    mandatory: bool = True,
    immediate: bool = False,
    exchange: RabbitExchange | str | None = None,
    **message_options: Unpack["MessageOptions"],
) -> None:
    headers = message_options.pop("headers", {})
    correlation_id = message_options.pop("correlation_id", None)

    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.exchange = None if exchange is None else RabbitExchange.validate(exchange)
    self.message_options: BasicMessageOptions = message_options
    self.publish_options: PublishOptions = {
        "mandatory": mandatory,
        "immediate": immediate,
        "timeout": timeout,
    }

exchange instance-attribute #

exchange = None if exchange is None else validate(exchange)

message_options instance-attribute #

message_options: BasicMessageOptions = message_options

publish_options instance-attribute #

publish_options: PublishOptions = {
    "mandatory": mandatory,
    "immediate": immediate,
    "timeout": timeout,
}

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

as_publish_command #

as_publish_command() -> RabbitPublishCommand
Source code in faststream/rabbit/response.py
@override
def as_publish_command(self) -> "RabbitPublishCommand":
    return RabbitPublishCommand(
        message=self.body,
        _publish_type=PublishType.PUBLISH,
        routing_key="",
        exchange=self.exchange,
        **self.publish_options,
        headers=self.headers,
        correlation_id=self.correlation_id,
        **self.message_options,
    )

get_publish_key #

get_publish_key() -> Any | None

Get the key for publishing this message.

Override this method in subclasses to provide broker-specific keys. Default implementation returns None (no key).

RETURNS DESCRIPTION
Any | None

The key for publishing, or None if this Response type doesn't use keys.

Source code in faststream/response/response.py
def get_publish_key(self) -> Any | None:
    """Get the key for publishing this message.

    Override this method in subclasses to provide broker-specific keys.
    Default implementation returns None (no key).

    Returns:
        The key for publishing, or None if this Response type doesn't use keys.
    """
    return None