def get_broker_channels(
broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> tuple[dict[str, Channel], dict[str, Operation]]:
"""Get the broker channels for an application."""
channels = {}
operations = {}
for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
for sub_key, sub_channel in sub.schema().items():
channel_obj = Channel.from_sub(sub_key, sub_channel)
channel_key = clear_key(sub_key)
if channel_key in channels:
warnings.warn(
f"Overwrite channel handler, channels have the same names: `{channel_key}`",
RuntimeWarning,
stacklevel=1,
)
channels[channel_key] = channel_obj
operations[f"{channel_key}Subscribe"] = Operation.from_sub(
messages=[
Reference(**{
"$ref": f"#/channels/{channel_key}/messages/{msg_name}",
})
for msg_name in channel_obj.messages
],
channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
operation=sub_channel.operation,
)
for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers):
for pub_key, pub_channel in pub.schema().items():
channel_obj = Channel.from_pub(pub_key, pub_channel)
channel_key = clear_key(pub_key)
if channel_key in channels:
warnings.warn(
f"Overwrite channel handler, channels have the same names: `{channel_key}`",
RuntimeWarning,
stacklevel=1,
)
channels[channel_key] = channel_obj
operations[channel_key] = Operation.from_pub(
messages=[
Reference(**{
"$ref": f"#/channels/{channel_key}/messages/{msg_name}",
})
for msg_name in channel_obj.messages
],
channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
operation=pub_channel.operation,
)
return channels, operations