Skip to content

get_broker_server

faststream.specification.asyncapi.v3_0_0.generate.get_broker_server #

get_broker_server(broker)

Get the broker server for an application.

Source code in faststream/specification/asyncapi/v3_0_0/generate.py
def get_broker_server(
    broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> dict[str, Server]:
    """Get the broker server for an application."""
    specification = broker.specification

    servers = {}

    tags: list[Tag | dict[str, Any]] | None = None
    if specification.tags:
        tags = [Tag.from_spec(tag) for tag in specification.tags]

    broker_meta: dict[str, Any] = {
        "protocol": specification.protocol,
        "protocolVersion": specification.protocol_version,
        "description": specification.description,
        "tags": tags,
        # TODO
        # "variables": "",
        # "bindings": "",
    }

    if specification.security is not None:
        broker_meta["security"] = specification.security.get_requirement()

    single_server = len(specification.url) == 1
    for i, broker_url in enumerate(specification.url, 1):
        server_url = broker_url if "://" in broker_url else f"//{broker_url}"

        parsed_url = urlparse(server_url)
        server_name = "development" if single_server else f"Server{i}"
        servers[server_name] = Server(
            host=parsed_url.netloc,
            pathname=parsed_url.path,
            **broker_meta,
        )

    return servers