TestRabbitBroker
faststream.rabbit.TestRabbitBroker #
TestRabbitBroker(
broker: RabbitBroker,
/,
*,
with_real: bool = False,
connect_only: bool | None = None,
)
TestRabbitBroker(
*brokers: RabbitBroker,
with_real: bool = False,
connect_only: bool | None = None,
)
TestRabbitBroker(
*brokers: RabbitBroker,
with_real: bool = False,
connect_only: bool | None = None,
)
Bases: TestBroker[RabbitBroker, EnterType]
A class to test RabbitMQ brokers.
Source code in faststream/rabbit/testing.py
create_publisher_fake_subscriber #
create_publisher_fake_subscriber(
broker: RabbitBroker, publisher: RabbitPublisher
) -> tuple[RabbitSubscriber, bool]