Skip to content

FakeProducer

faststream.rabbit.testing.FakeProducer #

FakeProducer(broker)

Bases: AioPikaFastProducer

A fake RabbitMQ producer for testing purposes.

This class extends AioPikaFastProducer and is used to simulate RabbitMQ message publishing during tests.

Source code in faststream/rabbit/testing.py
def __init__(self, broker: RabbitBroker) -> None:
    self.broker = broker

    default_parser = AioPikaParser()
    self._parser = ParserComposition(broker._parser, default_parser.parse_message)
    self._decoder = ParserComposition(
        broker._decoder,
        default_parser.decode_message,
    )

broker instance-attribute #

broker = broker

publish async #

publish(cmd)

Publish a message to a RabbitMQ queue or exchange.

Source code in faststream/rabbit/testing.py
@override
async def publish(
    self,
    cmd: "RabbitPublishCommand",
) -> None:
    """Publish a message to a RabbitMQ queue or exchange."""
    incoming = build_message(
        message=cmd.body,
        exchange=cmd.exchange,
        routing_key=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        reply_to=cmd.reply_to,
        serializer=self.broker.config.fd_config._serializer,
        **cmd.message_options,
    )

    called = False
    for handler in self.broker.subscribers:  # pragma: no branch
        handler = cast("RabbitSubscriber", handler)
        if _is_handler_matches(
            handler,
            incoming.routing_key,
            incoming.headers,
            cmd.exchange,
        ):
            called = True
            await self._execute_handler(incoming, handler)

    if not called:
        raise SubscriberNotFound

request async #

request(cmd)

Make a synchronous request to RabbitMQ.

Source code in faststream/rabbit/testing.py
@override
async def request(
    self,
    cmd: "RabbitPublishCommand",
) -> "PatchedMessage":
    """Make a synchronous request to RabbitMQ."""
    incoming = build_message(
        message=cmd.body,
        exchange=cmd.exchange,
        routing_key=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        **cmd.message_options,
    )

    for handler in self.broker.subscribers:  # pragma: no branch
        handler = cast("RabbitSubscriber", handler)
        if _is_handler_matches(
            handler,
            incoming.routing_key,
            incoming.headers,
            cmd.exchange,
        ):
            with anyio.fail_after(cmd.timeout):
                return await self._execute_handler(incoming, handler)

    raise SubscriberNotFound

publish_batch async #

publish_batch(cmd)
Source code in faststream/rabbit/publisher/producer.py
@override
async def publish_batch(self, cmd: "RabbitPublishCommand") -> None:
    msg = "RabbitMQ doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)

connect #

connect(serializer=None)
Source code in faststream/rabbit/publisher/producer.py
def connect(self, serializer: Optional["SerializerProto"] = None) -> None: ...

disconnect #

disconnect()
Source code in faststream/rabbit/publisher/producer.py
def disconnect(self) -> None: ...