Skip to content

get_broker_server

faststream.specification.asyncapi.v2_6_0.generate.get_broker_server #

get_broker_server(broker)

Get the broker server for an application.

Source code in faststream/specification/asyncapi/v2_6_0/generate.py
def get_broker_server(
    broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Server]:
    """Get the broker server for an application."""
    specification = broker.specification

    servers = {}

    broker_meta: dict[str, Any] = {
        "protocol": specification.protocol,
        "protocolVersion": specification.protocol_version,
        "description": specification.description,
        "tags": [Tag.from_spec(tag) for tag in specification.tags] or None,
        "security": specification.security.get_requirement()
        if specification.security
        else None,
        # TODO
        # "variables": "",
        # "bindings": "",
    }

    single_server = len(specification.url) == 1
    for i, url in enumerate(specification.url, 1):
        server_name = "development" if single_server else f"Server{i}"
        servers[server_name] = Server(url=url, **broker_meta)

    return servers