Skip to content

Visitor

faststream.redis.testing.Visitor #

Bases: Protocol

visit #

visit(
    *,
    channel: str | None,
    list: str | None,
    stream: str | None,
    sub: LogicSubscriber,
) -> str | None
Source code in faststream/redis/testing.py
def visit(
    self,
    *,
    channel: str | None,
    list: str | None,
    stream: str | None,
    sub: "LogicSubscriber",
) -> str | None: ...

get_message #

get_message(
    channel: str, body: Any, sub: LogicSubscriber
) -> Any
Source code in faststream/redis/testing.py
def get_message(self, channel: str, body: Any, sub: "LogicSubscriber") -> Any: ...