Skip to content

RabbitPublishCommand

faststream.rabbit.response.RabbitPublishCommand #

RabbitPublishCommand(
    message,
    *,
    _publish_type,
    routing_key="",
    exchange=None,
    mandatory=True,
    immediate=False,
    timeout=None,
    **message_options,
)

Bases: PublishCommand

Source code in faststream/rabbit/response.py
def __init__(
    self,
    message: "AioPikaSendableMessage",
    *,
    _publish_type: PublishType,
    routing_key: str = "",
    exchange: RabbitExchange | None = None,
    # publish kwargs
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    **message_options: Unpack["MessageOptions"],
) -> None:
    headers = message_options.pop("headers", {})
    reply_to = message_options.pop("reply_to", None) or ""
    correlation_id = message_options.pop("correlation_id", None)

    super().__init__(
        body=message,
        destination=routing_key,
        correlation_id=correlation_id,
        headers=headers,
        reply_to=reply_to,
        _publish_type=_publish_type,
    )
    self.exchange = exchange or RabbitExchange()

    self.timeout = timeout

    self.message_options: BasicMessageOptions = message_options
    self.publish_options: PublishOptions = {
        "mandatory": mandatory,
        "immediate": immediate,
    }

exchange instance-attribute #

exchange = exchange or RabbitExchange()

timeout instance-attribute #

timeout = timeout

message_options instance-attribute #

message_options = message_options

publish_options instance-attribute #

publish_options = {
    "mandatory": mandatory,
    "immediate": immediate,
}

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

batch_bodies property #

batch_bodies

from_cmd classmethod #

from_cmd(cmd)
Source code in faststream/rabbit/response.py
@classmethod
def from_cmd(
    cls,
    cmd: Union["PublishCommand", "RabbitPublishCommand"],
) -> "RabbitPublishCommand":
    if isinstance(cmd, RabbitPublishCommand):
        # NOTE: Should return a copy probably.
        return cmd

    return cls(
        message=cmd.body,
        routing_key=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        reply_to=cmd.reply_to,
        _publish_type=cmd.publish_type,
    )

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers