RabbitFakePublisher(
producer, routing_key, app_id, exchange
)
Bases: FakePublisher
Publisher Interface implementation to use as RPC or REPLY TO answer publisher.
Source code in faststream/rabbit/publisher/fake.py
| def __init__(
self,
producer: "ProducerProto[Any]",
routing_key: str,
app_id: str | None,
exchange: RabbitExchange,
) -> None:
super().__init__(producer=producer)
self.routing_key = routing_key
self.exchange = exchange
self.app_id = app_id
|
routing_key
instance-attribute
routing_key = routing_key
exchange
instance-attribute
app_id
instance-attribute
patch_command
Source code in faststream/rabbit/publisher/fake.py
| def patch_command(
self,
cmd: Union["PublishCommand", "RabbitPublishCommand"],
) -> "RabbitPublishCommand":
cmd = super().patch_command(cmd)
real_cmd = RabbitPublishCommand.from_cmd(cmd)
real_cmd.destination = self.routing_key
real_cmd.exchange = self.exchange
if self.app_id:
real_cmd.message_options["app_id"] = self.app_id
return real_cmd
|
publish
async
publish(message, /, *, correlation_id=None)
Source code in faststream/_internal/endpoint/publisher/fake.py
| async def publish(
self,
message: SendableMessage,
/,
*,
correlation_id: str | None = None,
) -> Any | None:
msg = (
f"`{self.__class__.__name__}` can be used only to publish "
"a response for `reply-to` or `RPC` messages."
)
raise NotImplementedError(msg)
|
request
async
request(message, /, *, correlation_id=None)
Source code in faststream/_internal/endpoint/publisher/fake.py
| async def request(
self,
message: "SendableMessage",
/,
*,
correlation_id: str | None = None,
) -> Any:
msg = (
f"`{self.__class__.__name__}` can be used only to publish "
"a response for `reply-to` or `RPC` messages."
)
raise NotImplementedError(msg)
|