Skip to content

get_broker_channels

faststream.specification.asyncapi.v2_6_0.generate.get_broker_channels #

get_broker_channels(broker)

Get the broker channels for an application.

Source code in faststream/specification/asyncapi/v2_6_0/generate.py
def get_broker_channels(
    broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Channel]:
    """Get the broker channels for an application."""
    channels = {}

    for s in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
        for key, sub in s.schema().items():
            if key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )

            channels[key] = Channel.from_sub(sub)

    for p in filter(lambda p: p.specification.include_in_schema, broker.publishers):
        for key, pub in p.schema().items():
            if key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )

            channels[key] = Channel.from_pub(pub)

    return channels