Skip to content

RabbitPublisher

faststream.rabbit.publisher.usecase.RabbitPublisher #

RabbitPublisher(config, specification)

Bases: PublisherUsecase

A class to represent a RabbitMQ publisher.

Source code in faststream/rabbit/publisher/usecase.py
def __init__(
    self,
    config: "RabbitPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specification)

    self.queue = config.queue
    self.routing_key = config.routing_key

    self.exchange = config.exchange

    self.headers = config.message_kwargs.pop("headers") or {}
    self.reply_to = config.message_kwargs.pop("reply_to", None) or ""
    self.timeout = config.message_kwargs.pop("timeout", None)

    message_options, _ = filter_by_dict(
        BasicMessageOptions,
        dict(config.message_kwargs),
    )
    self._message_options = message_options

    publish_options, _ = filter_by_dict(PublishOptions, dict(config.message_kwargs))
    self.publish_options = publish_options

queue instance-attribute #

queue = queue

routing_key instance-attribute #

routing_key = routing_key

exchange instance-attribute #

exchange = exchange

headers instance-attribute #

headers = pop('headers') or {}

reply_to instance-attribute #

reply_to = pop('reply_to', None) or ''

timeout instance-attribute #

timeout = pop('timeout', None)

publish_options instance-attribute #

publish_options = publish_options

message_options property #

message_options

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

routing #

routing(*, queue=None, routing_key='')
Source code in faststream/rabbit/publisher/usecase.py
def routing(
    self,
    *,
    queue: Union["RabbitQueue", str, None] = None,
    routing_key: str = "",
) -> str:
    if not routing_key:
        if q := RabbitQueue.validate(queue):
            routing_key = q.routing()
        else:
            r = self.routing_key or self.queue.routing()
            routing_key = f"{self._outer_config.prefix}{r}"

    return routing_key

start async #

start()
Source code in faststream/rabbit/publisher/usecase.py
async def start(self) -> None:
    if self.exchange is not None:
        await self._outer_config.declarer.declare_exchange(self.exchange)
    return await super().start()

publish async #

publish(
    message,
    queue=None,
    exchange=None,
    *,
    routing_key="",
    **publish_kwargs,
)
Source code in faststream/rabbit/publisher/usecase.py
@override
async def publish(
    self,
    message: "AioPikaSendableMessage",
    queue: Union["RabbitQueue", str, None] = None,
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    **publish_kwargs: "Unpack[PublishKwargs]",
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
    if "headers" in publish_kwargs:
        headers = self.headers | (publish_kwargs.pop("headers") or {})
    else:
        headers = self.headers

    correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id())

    cmd = RabbitPublishCommand(
        message,
        routing_key=self.routing(queue=queue, routing_key=routing_key),
        exchange=RabbitExchange.validate(exchange or self.exchange),
        headers=headers,
        correlation_id=correlation_id,
        _publish_type=PublishType.PUBLISH,
        **(self.publish_options | self.message_options | publish_kwargs),  # type: ignore[operator]
    )

    frame: aiormq.abc.ConfirmationFrameType | None = await self._basic_publish(
        cmd,
        producer=self._outer_config.producer,
        _extra_middlewares=(),
    )
    return frame

request async #

request(
    message,
    queue=None,
    exchange=None,
    *,
    routing_key="",
    **publish_kwargs,
)
Source code in faststream/rabbit/publisher/usecase.py
@override
async def request(
    self,
    message: "AioPikaSendableMessage",
    queue: Union["RabbitQueue", str, None] = None,
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    **publish_kwargs: "Unpack[PublishKwargs]",
) -> "RabbitMessage":
    if "headers" in publish_kwargs:
        headers = self.headers | (publish_kwargs.pop("headers") or {})
    else:
        headers = self.headers

    correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id())

    cmd = RabbitPublishCommand(
        message,
        routing_key=self.routing(queue=queue, routing_key=routing_key),
        exchange=RabbitExchange.validate(exchange or self.exchange),
        correlation_id=correlation_id,
        headers=headers,
        _publish_type=PublishType.PUBLISH,
        **(self.publish_options | self.message_options | publish_kwargs),  # type: ignore[operator]
    )

    msg: RabbitMessage = await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )
    return msg

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()