Skip to content

FakeChannelManager

faststream.rabbit.helpers.channel_manager.FakeChannelManager #

Bases: ChannelManager

connect #

connect(connection)
Source code in faststream/rabbit/helpers/channel_manager.py
def connect(self, connection: "aio_pika.RobustConnection") -> None:
    raise NotImplementedError

disconnect #

disconnect()
Source code in faststream/rabbit/helpers/channel_manager.py
def disconnect(self) -> None:
    raise NotImplementedError

get_channel async #

get_channel(channel=None)
Source code in faststream/rabbit/helpers/channel_manager.py
async def get_channel(
    self,
    channel: Optional["Channel"] = None,
) -> "aio_pika.RobustChannel":
    raise NotImplementedError