Skip to content

RabbitFakePublisher

faststream.rabbit.publisher.fake.RabbitFakePublisher #

RabbitFakePublisher(
    producer, routing_key, app_id, exchange
)

Bases: FakePublisher

Publisher Interface implementation to use as RPC or REPLY TO answer publisher.

Source code in faststream/rabbit/publisher/fake.py
def __init__(
    self,
    producer: "ProducerProto[Any]",
    routing_key: str,
    app_id: str | None,
    exchange: RabbitExchange,
) -> None:
    super().__init__(producer=producer)
    self.routing_key = routing_key
    self.exchange = exchange
    self.app_id = app_id

routing_key instance-attribute #

routing_key = routing_key

exchange instance-attribute #

exchange = exchange

app_id instance-attribute #

app_id = app_id

patch_command #

patch_command(cmd)
Source code in faststream/rabbit/publisher/fake.py
def patch_command(
    self,
    cmd: Union["PublishCommand", "RabbitPublishCommand"],
) -> "RabbitPublishCommand":
    cmd = super().patch_command(cmd)
    real_cmd = RabbitPublishCommand.from_cmd(cmd)
    real_cmd.destination = self.routing_key
    real_cmd.exchange = self.exchange
    if self.app_id:
        real_cmd.message_options["app_id"] = self.app_id
    return real_cmd

publish async #

publish(message, /, *, correlation_id=None)
Source code in faststream/_internal/endpoint/publisher/fake.py
async def publish(
    self,
    message: SendableMessage,
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    msg = (
        f"`{self.__class__.__name__}` can be used only to publish "
        "a response for `reply-to` or `RPC` messages."
    )
    raise NotImplementedError(msg)

request async #

request(message, /, *, correlation_id=None)
Source code in faststream/_internal/endpoint/publisher/fake.py
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any:
    msg = (
        f"`{self.__class__.__name__}` can be used only to publish "
        "a response for `reply-to` or `RPC` messages."
    )
    raise NotImplementedError(msg)