Get the broker server for an application.
Source code in faststream/specification/asyncapi/v2_6_0/generate.py
| def get_broker_server(
broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Server]:
"""Get the broker server for an application."""
specification = broker.specification
servers = {}
broker_meta: dict[str, Any] = {
"protocol": specification.protocol,
"protocolVersion": specification.protocol_version,
"description": specification.description,
"tags": [Tag.from_spec(tag) for tag in specification.tags] or None,
"security": specification.security.get_requirement()
if specification.security
else None,
# TODO
# "variables": "",
# "bindings": "",
}
single_server = len(specification.url) == 1
for i, url in enumerate(specification.url, 1):
server_name = "development" if single_server else f"Server{i}"
servers[server_name] = Server(url=url, **broker_meta)
return servers
|