Skip to content

FakeChannelManager

faststream.rabbit.helpers.channel_manager.FakeChannelManager #

Bases: ChannelManager

connect #

connect(connection: RobustConnection) -> None
Source code in faststream/rabbit/helpers/channel_manager.py
def connect(self, connection: "aio_pika.RobustConnection") -> None:
    raise NotImplementedError

disconnect #

disconnect() -> None
Source code in faststream/rabbit/helpers/channel_manager.py
def disconnect(self) -> None:
    raise NotImplementedError

get_channel async #

get_channel(
    channel: Optional[Channel] = None,
) -> RobustChannel
Source code in faststream/rabbit/helpers/channel_manager.py
async def get_channel(
    self,
    channel: Optional["Channel"] = None,
) -> "aio_pika.RobustChannel":
    raise NotImplementedError