Skip to content

ChannelManagerImpl

faststream.rabbit.helpers.channel_manager.ChannelManagerImpl #

ChannelManagerImpl(default_channel=None)

Bases: ChannelManager

Source code in faststream/rabbit/helpers/channel_manager.py
def __init__(
    self,
    default_channel: Optional["Channel"] = None,
) -> None:
    self.__connection: ConnectionState = EmptyConnectionState()

    self.__default_channel = default_channel or Channel()

    self.__channels: dict[Channel, aio_pika.RobustChannel] = {}

connect #

connect(connection)
Source code in faststream/rabbit/helpers/channel_manager.py
def connect(self, connection: "aio_pika.RobustConnection") -> None:
    self.__connection = ConnectedState(connection)

disconnect #

disconnect()
Source code in faststream/rabbit/helpers/channel_manager.py
def disconnect(self) -> None:
    self.__connection = EmptyConnectionState()
    self.__channels.clear()

get_channel async #

get_channel(channel=None)
Source code in faststream/rabbit/helpers/channel_manager.py
async def get_channel(
    self,
    channel: Optional["Channel"] = None,
) -> "aio_pika.RobustChannel":
    if channel is None:
        channel = self.__default_channel

    if (ch := self.__channels.get(channel)) is None:
        self.__channels[channel] = ch = cast(
            "aio_pika.RobustChannel",
            await self.__connection.connection.channel(
                channel_number=channel.channel_number,
                publisher_confirms=channel.publisher_confirms,
                on_return_raises=channel.on_return_raises,
            ),
        )

        if channel.prefetch_count:
            await ch.set_qos(
                prefetch_count=channel.prefetch_count,
                global_=channel.global_qos,
            )

    return ch