Get the ASGI routes for an application.
Source code in faststream/specification/asyncapi/v3_0_0/generate.py
| def get_asgi_routes(
http_handlers: list[tuple[str, "HttpHandler"]],
) -> tuple[dict[str, Channel], dict[str, Operation]]:
"""Get the ASGI routes for an application."""
channels: dict[str, Channel] = {}
operations: dict[str, Operation] = {}
for path, asgi_app in http_handlers:
if asgi_app.include_in_schema:
channel = Channel(
description=asgi_app.description,
address=path,
messages={},
)
channel_name = "".join(
char
for char in path.strip("/").replace("/", "_")
if char in string.ascii_letters + string.digits + "_"
)
channel_name = f"{channel_name}:HttpChannel"
channels[channel_name] = channel
operation = Operation(
action=Action.RECEIVE,
channel=Reference(**{"$ref": f"#/channels/{channel_name}"}),
bindings=OperationBinding(
http=http_bindings.OperationBinding(
method=_get_http_binding_method(asgi_app.methods),
bindingVersion="0.3.0",
),
),
)
operations[channel_name] = operation
return channels, operations
|