make_try_it_out_handler(
broker: BrokerUsecase[Any, Any],
description: str | None = None,
tags: Sequence[Union[Tag, TagDict, dict[str, Any]]]
| None = None,
unique_id: str | None = None,
include_in_schema: bool = False,
) -> PostHandler
Create POST handler for asyncapi-try-it-plugin to publish messages to broker.
Source code in faststream/asgi/factories/asyncapi/try_it_out.py
| def make_try_it_out_handler(
broker: "BrokerUsecase[Any, Any]",
description: str | None = None,
tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None,
unique_id: str | None = None,
include_in_schema: bool = False,
) -> "PostHandler":
"""Create POST handler for asyncapi-try-it-plugin to publish messages to broker."""
processor = TryItOutProcessor(broker)
@post(
description=description,
tags=tags,
unique_id=unique_id,
include_in_schema=include_in_schema,
)
async def try_it_out(request: Request) -> AsgiResponse:
try:
body: TryItOutForm = await request.json()
except Exception as e:
return JSONResponse({"details": f"Invalid JSON: {e}"}, 400)
return await processor.process(body)
return try_it_out
|