Bases: AioPikaFastProducer
A fake RabbitMQ producer for testing purposes.
This class extends AioPikaFastProducer and is used to simulate RabbitMQ message publishing during tests.
Source code in faststream/rabbit/testing.py
| def __init__(self, broker: RabbitBroker) -> None:
self.broker = broker
default_parser = AioPikaParser()
self._parser = ParserComposition(broker._parser, default_parser.parse_message)
self._decoder = ParserComposition(
broker._decoder,
default_parser.decode_message,
)
|
broker
instance-attribute
publish
async
Publish a message to a RabbitMQ queue or exchange.
Source code in faststream/rabbit/testing.py
| @override
async def publish(
self,
cmd: "RabbitPublishCommand",
) -> None:
"""Publish a message to a RabbitMQ queue or exchange."""
incoming = build_message(
message=cmd.body,
exchange=cmd.exchange,
routing_key=cmd.destination,
correlation_id=cmd.correlation_id,
headers=cmd.headers,
reply_to=cmd.reply_to,
serializer=self.broker.config.fd_config._serializer,
**cmd.message_options,
)
called = False
for handler in self.broker.subscribers: # pragma: no branch
handler = cast("RabbitSubscriber", handler)
if _is_handler_matches(
handler,
incoming.routing_key,
incoming.headers,
cmd.exchange,
):
called = True
await self._execute_handler(incoming, handler)
if not called:
raise SubscriberNotFound
|
request
async
Make a synchronous request to RabbitMQ.
Source code in faststream/rabbit/testing.py
| @override
async def request(
self,
cmd: "RabbitPublishCommand",
) -> "PatchedMessage":
"""Make a synchronous request to RabbitMQ."""
incoming = build_message(
message=cmd.body,
exchange=cmd.exchange,
routing_key=cmd.destination,
correlation_id=cmd.correlation_id,
headers=cmd.headers,
**cmd.message_options,
)
for handler in self.broker.subscribers: # pragma: no branch
handler = cast("RabbitSubscriber", handler)
if _is_handler_matches(
handler,
incoming.routing_key,
incoming.headers,
cmd.exchange,
):
with anyio.fail_after(cmd.timeout):
return await self._execute_handler(incoming, handler)
raise SubscriberNotFound
|
publish_batch
async
Source code in faststream/rabbit/publisher/producer.py
| @override
async def publish_batch(self, cmd: "RabbitPublishCommand") -> None:
msg = "RabbitMQ doesn't support publishing in batches."
raise FeatureNotSupportedException(msg)
|
connect
Source code in faststream/rabbit/publisher/producer.py
| def connect(self, serializer: Optional["SerializerProto"] = None) -> None: ...
|
disconnect
Source code in faststream/rabbit/publisher/producer.py
| def disconnect(self) -> None: ...
|