def make_ping_asgi(
broker: "BrokerUsecase[Any, Any]",
/,
timeout: Optional[float] = None,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> "ASGIApp":
healthy_response = AsgiResponse(b"", 204)
unhealthy_response = AsgiResponse(b"", 500)
@get(
include_in_schema=include_in_schema,
description=description,
tags=tags,
unique_id=unique_id,
)
async def ping(scope: "Scope") -> AsgiResponse:
if await broker.ping(timeout):
return healthy_response
return unhealthy_response
return ping