Skip to content

get_asgi_routes

faststream.asyncapi.generate.get_asgi_routes #

get_asgi_routes(app)

Get the ASGI routes for an application.

Source code in faststream/asyncapi/generate.py
def get_asgi_routes(app: "AsyncAPIApplication") -> Dict[str, Channel]:
    """Get the ASGI routes for an application."""
    # We should import this here due
    # ASGI > Application > asynciapi.proto
    # so it looks like a circular import
    from faststream.asgi import AsgiFastStream
    from faststream.asgi.handlers import HttpHandler

    if not isinstance(app, AsgiFastStream):
        return {}

    channels: Dict[str, Channel] = {}
    for route in app.routes:
        path, asgi_app = route

        if isinstance(asgi_app, HttpHandler) and asgi_app.include_in_schema:
            channel = Channel(
                description=asgi_app.description,
                subscribe=Operation(
                    tags=asgi_app.tags,
                    operationId=asgi_app.unique_id,
                    bindings=OperationBinding(
                        http=http_bindings.OperationBinding(
                            method=", ".join(asgi_app.methods)
                        )
                    ),
                ),
            )

            channels[path] = channel

    return channels