Skip to content

get_broker_channels

faststream.specification.asyncapi.v3_0_0.generate.get_broker_channels #

get_broker_channels(broker)

Get the broker channels for an application.

Source code in faststream/specification/asyncapi/v3_0_0/generate.py
def get_broker_channels(
    broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> tuple[dict[str, Channel], dict[str, Operation]]:
    """Get the broker channels for an application."""
    channels = {}
    operations = {}

    for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
        for sub_key, sub_channel in sub.schema().items():
            channel_obj = Channel.from_sub(sub_key, sub_channel)

            channel_key = clear_key(sub_key)
            if channel_key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{channel_key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )

            channels[channel_key] = channel_obj

            operations[f"{channel_key}Subscribe"] = Operation.from_sub(
                messages=[
                    Reference(**{
                        "$ref": f"#/channels/{channel_key}/messages/{msg_name}",
                    })
                    for msg_name in channel_obj.messages
                ],
                channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
                operation=sub_channel.operation,
            )

    for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers):
        for pub_key, pub_channel in pub.schema().items():
            channel_obj = Channel.from_pub(pub_key, pub_channel)

            channel_key = clear_key(pub_key)
            if channel_key in channels:
                warnings.warn(
                    f"Overwrite channel handler, channels have the same names: `{channel_key}`",
                    RuntimeWarning,
                    stacklevel=1,
                )
            channels[channel_key] = channel_obj

            operations[channel_key] = Operation.from_pub(
                messages=[
                    Reference(**{
                        "$ref": f"#/channels/{channel_key}/messages/{msg_name}",
                    })
                    for msg_name in channel_obj.messages
                ],
                channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
                operation=pub_channel.operation,
            )

    return channels, operations