Skip to content

get_broker_server

faststream.specification.asyncapi.v2_6_0.generate.get_broker_server #

get_broker_server(
    *brokers: BrokerUsecase[Any, Any],
) -> tuple[
    dict[str, Server],
    dict[BrokerUsecase[Any, Any], list[str]],
]

Get the broker server for an application.

Source code in faststream/specification/asyncapi/v2_6_0/generate.py
def get_broker_server(
    *brokers: "BrokerUsecase[Any, Any]",
) -> tuple[
    dict[str, Server],
    dict["BrokerUsecase[Any, Any]", list[str]],
]:
    """Get the broker server for an application."""
    servers: list[Server] = []
    broker_servers: list[tuple[BrokerUsecase[Any, Any], Server]] = []

    for broker in brokers:
        specification = broker.specification

        broker_meta: dict[str, Any] = {
            "protocol": specification.protocol,
            "protocolVersion": specification.protocol_version,
            "description": specification.description,
            "tags": [Tag.from_spec(tag) for tag in specification.tags] or None,
            "security": specification.security.get_requirement()
            if specification.security
            else None,
            # TODO
            # "variables": "",
            # "bindings": "",
        }

        for url in specification.url:
            server = Server(url=url, **broker_meta)
            # deduplicate servers
            broker_servers.append((broker, server))
            if server not in servers:
                servers.append(server)

    servers_by_names: dict[str, Server] = {}
    single_server = len(servers) == 1
    for i, server in enumerate(servers, 1):
        server_name = "development" if single_server else f"Server{i}"
        servers_by_names[server_name] = server

    broker_server_names: dict[BrokerUsecase[Any, Any], list[str]] = {}
    for name, server in servers_by_names.items():
        for broker, br_server in broker_servers:
            if server == br_server:
                broker_server_names.setdefault(broker, []).append(name)

    return servers_by_names, broker_server_names