Get the broker server for an application.
Source code in faststream/specification/asyncapi/v3_0_0/generate.py
| def get_broker_server(
broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Server]:
"""Get the broker server for an application."""
specification = broker.specification
servers = {}
tags: list[Tag | dict[str, Any]] | None = None
if specification.tags:
tags = [Tag.from_spec(tag) for tag in specification.tags]
broker_meta: dict[str, Any] = {
"protocol": specification.protocol,
"protocolVersion": specification.protocol_version,
"description": specification.description,
"tags": tags,
# TODO
# "variables": "",
# "bindings": "",
}
if specification.security is not None:
broker_meta["security"] = specification.security.get_requirement()
single_server = len(specification.url) == 1
for i, broker_url in enumerate(specification.url, 1):
server_url = broker_url if "://" in broker_url else f"//{broker_url}"
parsed_url = urlparse(server_url)
server_name = "development" if single_server else f"Server{i}"
servers[server_name] = Server(
host=parsed_url.netloc,
pathname=parsed_url.path,
**broker_meta,
)
return servers
|