Get the ASGI routes for an application.
Source code in faststream/asyncapi/generate.py
| def get_asgi_routes(app: "AsyncAPIApplication") -> Dict[str, Channel]:
"""Get the ASGI routes for an application."""
# We should import this here due
# ASGI > Application > asynciapi.proto
# so it looks like a circular import
from faststream.asgi import AsgiFastStream
from faststream.asgi.handlers import HttpHandler
if not isinstance(app, AsgiFastStream):
return {}
channels: Dict[str, Channel] = {}
for route in app.routes:
path, asgi_app = route
if isinstance(asgi_app, HttpHandler) and asgi_app.include_in_schema:
channel = Channel(
description=asgi_app.description,
subscribe=Operation(
tags=asgi_app.tags,
operationId=asgi_app.unique_id,
bindings=OperationBinding(
http=http_bindings.OperationBinding(
method=", ".join(asgi_app.methods)
)
),
),
)
channels[path] = channel
return channels
|