ChannelVisitor
faststream.redis.testing.ChannelVisitor #
Bases: Visitor
visit #
visit(
*,
sub: LogicSubscriber,
channel: str | None = None,
list: str | None = None,
stream: str | None = None,
) -> str | None
Source code in faststream/redis/testing.py
get_message #
get_message(
channel: str, body: Any, sub: ChannelSubscriber
) -> Any