Skip to content

get_broker_channels

faststream.specification.asyncapi.v3_0_0.generate.get_broker_channels #

get_broker_channels(
    broker: BrokerUsecase[MsgType, ConnectionType],
    servers: list[str] | None = None,
) -> tuple[dict[str, Channel], dict[str, Operation]]

Get the broker channels for an application.

Source code in faststream/specification/asyncapi/v3_0_0/generate.py
def get_broker_channels(
    broker: "BrokerUsecase[MsgType, ConnectionType]", servers: list[str] | None = None
) -> tuple[dict[str, Channel], dict[str, Operation]]:
    """Get the broker channels for an application."""
    channels = {}
    operations = {}

    channel_servers = [
        ref("servers", server_name) for server_name in (servers or ())
    ] or None

    for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
        for sub_key, sub_channel in sub.schema().items():
            channel_obj = Channel.from_sub(sub_key, sub_channel, servers=channel_servers)

            channel_key = clear_key(sub_key)
            if channel_key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{channel_key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )

            channels[channel_key] = channel_obj

            operation_key = (
                f"{channel_key}Subscribe"
                if sub.specification.config.title_ is None
                or sub.specification.config.title_ == "/"
                else sub.specification.config.title_
            )
            if operation_key in operations:
                warnings.warn(
                    f"Overwrite channel handler, operations have the same names: `{operation_key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )

            operations[operation_key] = Operation.from_sub(
                messages=[
                    Reference(**ref("channels", channel_key, "messages", msg_name))
                    for msg_name in channel_obj.messages
                ],
                channel=Reference(**ref("channels", channel_key)),
                operation=sub_channel.operation,
            )

    for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers):
        for pub_key, pub_channel in pub.schema().items():
            channel_obj = Channel.from_pub(pub_key, pub_channel, servers=channel_servers)

            channel_key = clear_key(pub_key)
            if channel_key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{channel_key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )
            channels[channel_key] = channel_obj

            operations[channel_key] = Operation.from_pub(
                messages=[
                    Reference(**ref("channels", channel_key, "messages", msg_name))
                    for msg_name in channel_obj.messages
                ],
                channel=Reference(**ref("channels", channel_key)),
                operation=pub_channel.operation,
            )

    return channels, operations