Skip to content

TestRedisBroker

faststream.redis.TestRedisBroker #

TestRedisBroker(
    broker: RedisBroker,
    /,
    *,
    with_real: bool = False,
    connect_only: bool | None = None,
)
TestRedisBroker(
    *brokers: RedisBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
)
TestRedisBroker(
    *brokers: RedisBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
)

Bases: TestBroker[RedisBroker, EnterType]

A class to test Redis brokers.

Source code in faststream/redis/testing.py
def __init__(
    self,
    *brokers: RedisBroker,
    with_real: bool = False,
    connect_only: bool | None = None,
) -> None:
    super().__init__(
        *brokers,
        with_real=with_real,
        connect_only=connect_only,
    )

with_real instance-attribute #

with_real = with_real

brokers instance-attribute #

brokers = brokers

connect_only instance-attribute #

connect_only = connect_only

create_publisher_fake_subscriber #

create_publisher_fake_subscriber(
    broker: RedisBroker, publisher: LogicPublisher
) -> tuple[LogicSubscriber, bool]
Source code in faststream/redis/testing.py
def create_publisher_fake_subscriber(
    self,
    broker: RedisBroker,
    publisher: "LogicPublisher",
) -> tuple["LogicSubscriber", bool]:
    sub: LogicSubscriber | None = None

    named_property = publisher.subscriber_property(name_only=True)
    visitors = (ChannelVisitor(), ListVisitor(), StreamVisitor())

    for handler in (
        s for b in self.brokers for s in b.subscribers
    ):  # pragma: no branch
        handler = cast("LogicSubscriber", handler)
        for visitor in visitors:
            if visitor.visit(**named_property, sub=handler):
                sub = handler
                break

    if sub is None:
        is_real = False
        sub_options = publisher.subscriber_property(name_only=False)
        sub = broker.subscriber(**sub_options, persistent=False)
    else:
        is_real = True

    return sub, is_real