Skip to content

get_app_schema

faststream.specification.asyncapi.v3_0_0.generate.get_app_schema #

get_app_schema(
    *brokers: BrokerUsecase[Any, Any],
    title: str,
    app_version: str,
    schema_version: str,
    description: str | None,
    terms_of_service: Optional[AnyHttpUrl],
    contact: Union[Contact, ContactDict, dict[str, Any]]
    | None,
    license: Union[License, LicenseDict, dict[str, Any]]
    | None,
    identifier: str | None,
    tags: Sequence[Union[Tag, TagDict, dict[str, Any]]]
    | None,
    external_docs: Union[
        ExternalDocs, ExternalDocsDict, dict[str, Any]
    ]
    | None,
    http_handlers: list[tuple[str, HttpHandler]],
) -> ApplicationSchema

Get the application schema.

Source code in faststream/specification/asyncapi/v3_0_0/generate.py
def get_app_schema(
    *brokers: "BrokerUsecase[Any, Any]",
    title: str,
    app_version: str,
    schema_version: str,
    description: str | None,
    terms_of_service: Optional["AnyHttpUrl"],
    contact: Union["SpecContact", "ContactDict", dict[str, Any]] | None,
    license: Union["SpecLicense", "LicenseDict", dict[str, Any]] | None,
    identifier: str | None,
    tags: Sequence[Union["SpecTag", "TagDict", dict[str, Any]]] | None,
    external_docs: Union["SpecDocs", "ExternalDocsDict", dict[str, Any]] | None,
    http_handlers: list[tuple[str, "HttpHandler"]],
) -> ApplicationSchema:
    """Get the application schema."""
    if any(br.specification.security for br in brokers):
        list_of_specification_security = (
            br.specification.security.get_schema()
            for br in brokers
            if br.specification.security
        )
        security_schemes: dict[str, dict[str, Any]] | None = convert_list_of_dict_to_dict(
            list_of_specification_security,
            "specification security",
        )
    else:
        security_schemes = None

    servers, broker_servers = get_broker_server(*brokers)

    list_of_channels_operations = [
        get_broker_channels(br, servers=srv) for br, srv in broker_servers.items()
    ]
    list_of_channels = (itchannel for itchannel, _ in list_of_channels_operations)
    list_of_operations = (itoperation for _, itoperation in list_of_channels_operations)

    channels = convert_list_of_dict_to_dict(list_of_channels, "channel")
    operations = convert_list_of_dict_to_dict(list_of_operations, "operation")

    messages: dict[str, Message] = {}
    payloads: dict[str, dict[str, Any]] = {}

    added_channels, added_operations = get_asgi_routes(http_handlers)
    channels.update(added_channels)
    operations.update(added_operations)

    for channel_name, channel in channels.items():
        msgs: dict[str, Message | Reference] = {}
        for message_name, message in channel.messages.items():
            assert isinstance(message, Message)

            msgs[message_name] = _resolve_msg_payloads(
                message_name,
                message,
                channel_name,
                payloads,
                messages,
            )

        channel.messages = msgs

    return ApplicationSchema(
        info=ApplicationInfo(
            title=title,
            version=app_version,
            description=description,
            termsOfService=terms_of_service,
            contact=Contact.from_spec(contact),
            license=License.from_spec(license),
            tags=[Tag.from_spec(tag) for tag in tags] or None if tags else None,
            externalDocs=ExternalDocs.from_spec(external_docs),
        ),
        asyncapi=schema_version,
        defaultContentType=ContentTypes.JSON.value,
        id=identifier,
        servers=servers,
        channels=channels,
        operations=operations,
        components=Components(
            messages=messages,
            schemas=payloads,
            securitySchemes=security_schemes,
        ),
    )