Skip to content

TestRabbitBroker

faststream.rabbit.TestRabbitBroker #

TestRabbitBroker(
    broker: RabbitBroker,
    /,
    *,
    with_real: bool = False,
    connect_only: bool | None = None,
)
TestRabbitBroker(
    *brokers: RabbitBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
)
TestRabbitBroker(
    *brokers: RabbitBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
)

Bases: TestBroker[RabbitBroker, EnterType]

A class to test RabbitMQ brokers.

Source code in faststream/rabbit/testing.py
def __init__(
    self,
    *brokers: RabbitBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
) -> None:
    super().__init__(
        *brokers,
        with_real=with_real,
        connect_only=connect_only,
    )

with_real instance-attribute #

with_real = with_real

brokers instance-attribute #

brokers = brokers

connect_only instance-attribute #

connect_only = connect_only

create_publisher_fake_subscriber #

create_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: RabbitPublisher
) -> tuple[RabbitSubscriber, bool]
Source code in faststream/rabbit/testing.py
def create_publisher_fake_subscriber(
    self,
    broker: "RabbitBroker",
    publisher: "RabbitPublisher",
) -> tuple["RabbitSubscriber", bool]:
    sub: RabbitSubscriber | None = None
    for handler in (s for b in self.brokers for s in b.subscribers):
        handler = cast("RabbitSubscriber", handler)
        if _is_handler_matches(
            handler,
            publisher.routing(),
            {},
            publisher.exchange,
        ):
            sub = handler
            break

    if sub is None:
        is_real = False
        sub = broker.subscriber(
            queue=publisher.routing(),
            exchange=publisher.exchange,
            persistent=False,
        )
    else:
        is_real = True

    return sub, is_real