Get the broker channels for an application.
Source code in faststream/specification/asyncapi/v2_6_0/generate.py
| def get_broker_channels(
broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Channel]:
"""Get the broker channels for an application."""
channels = {}
for s in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
for key, sub in s.schema().items():
if key in channels:
warnings.warn(
f"Overwrite channel handler, channels have the same names: `{key}`",
RuntimeWarning,
stacklevel=1,
)
channels[key] = Channel.from_sub(sub)
for p in filter(lambda p: p.specification.include_in_schema, broker.publishers):
for key, pub in p.schema().items():
if key in channels:
warnings.warn(
f"Overwrite channel handler, channels have the same names: `{key}`",
RuntimeWarning,
stacklevel=1,
)
channels[key] = Channel.from_pub(pub)
return channels
|