Skip to content

make_ping_asgi

faststream.asgi.factories.make_ping_asgi #

make_ping_asgi(
    broker,
    /,
    timeout=None,
    include_in_schema=True,
    description=None,
    tags=None,
    unique_id=None,
)
Source code in faststream/asgi/factories.py
def make_ping_asgi(
    broker: "BrokerUsecase[Any, Any]",
    /,
    timeout: float | None = None,
    include_in_schema: bool = True,
    description: str | None = None,
    tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None,
    unique_id: str | None = None,
) -> ASGIApp:
    healthy_response = AsgiResponse(b"", 204)
    unhealthy_response = AsgiResponse(b"", 500)

    @get(
        include_in_schema=include_in_schema,
        description=description,
        tags=tags,
        unique_id=unique_id,
    )
    async def ping(scope: Scope) -> AsgiResponse:
        if await broker.ping(timeout):
            return healthy_response
        return unhealthy_response

    return ping