TestNatsBroker
faststream.nats.TestNatsBroker #
TestNatsBroker(
broker: NatsBroker,
/,
*,
with_real: bool = False,
connect_only: bool | None = None,
)
TestNatsBroker(
*brokers: NatsBroker,
with_real: bool = False,
connect_only: bool | None = None,
)
TestNatsBroker(
*brokers: NatsBroker,
with_real: bool = False,
connect_only: bool | None = None,
)
Bases: TestBroker[NatsBroker, EnterType]
A class to test NATS brokers.
Source code in faststream/nats/testing.py
create_publisher_fake_subscriber #
create_publisher_fake_subscriber(
broker: NatsBroker, publisher: LogicPublisher
) -> tuple[LogicSubscriber[Any], bool]