Skip to content

get_asgi_routes

faststream.specification.asyncapi.v3_0_0.generate.get_asgi_routes #

get_asgi_routes(http_handlers)

Get the ASGI routes for an application.

Source code in faststream/specification/asyncapi/v3_0_0/generate.py
def get_asgi_routes(
    http_handlers: list[tuple[str, "HttpHandler"]],
) -> tuple[dict[str, Channel], dict[str, Operation]]:
    """Get the ASGI routes for an application."""
    channels: dict[str, Channel] = {}
    operations: dict[str, Operation] = {}
    for path, asgi_app in http_handlers:
        if asgi_app.include_in_schema:
            channel = Channel(
                description=asgi_app.description,
                address=path,
                messages={},
            )
            channel_name = "".join(
                char
                for char in path.strip("/").replace("/", "_")
                if char in string.ascii_letters + string.digits + "_"
            )
            channel_name = f"{channel_name}:HttpChannel"
            channels[channel_name] = channel
            operation = Operation(
                action=Action.RECEIVE,
                channel=Reference(**{"$ref": f"#/channels/{channel_name}"}),
                bindings=OperationBinding(
                    http=http_bindings.OperationBinding(
                        method=_get_http_binding_method(asgi_app.methods),
                        bindingVersion="0.3.0",
                    ),
                ),
            )
            operations[channel_name] = operation
    return channels, operations