Skip to content

make_ping_asgi

faststream.asgi.factories.make_ping_asgi #

make_ping_asgi(
    broker,
    /,
    timeout=None,
    include_in_schema=True,
    description=None,
    tags=None,
    unique_id=None,
)
Source code in faststream/asgi/factories.py
def make_ping_asgi(
    broker: "BrokerUsecase[Any, Any]",
    /,
    timeout: Optional[float] = None,
    include_in_schema: bool = True,
    description: Optional[str] = None,
    tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
    unique_id: Optional[str] = None,
) -> "ASGIApp":
    healthy_response = AsgiResponse(b"", 204)
    unhealthy_response = AsgiResponse(b"", 500)

    @get(
        include_in_schema=include_in_schema,
        description=description,
        tags=tags,
        unique_id=unique_id,
    )
    async def ping(scope: "Scope") -> AsgiResponse:
        if await broker.ping(timeout):
            return healthy_response
        return unhealthy_response

    return ping