Skip to content

ChannelManager

faststream.rabbit.helpers.channel_manager.ChannelManager #

Bases: Protocol

connect #

connect(connection)
Source code in faststream/rabbit/helpers/channel_manager.py
def connect(self, connection: "aio_pika.RobustConnection") -> None: ...

disconnect #

disconnect()
Source code in faststream/rabbit/helpers/channel_manager.py
def disconnect(self) -> None: ...

get_channel async #

get_channel(channel=None)

Declare a channel.

Source code in faststream/rabbit/helpers/channel_manager.py
async def get_channel(
    self,
    channel: Optional["Channel"] = None,
) -> "aio_pika.RobustChannel":
    """Declare a channel."""
    ...