Skip to content

make_try_it_out_handler

faststream.asgi.factories.make_try_it_out_handler #

make_try_it_out_handler(
    broker: BrokerUsecase[Any, Any],
    description: str | None = None,
    tags: Sequence[Union[Tag, TagDict, dict[str, Any]]]
    | None = None,
    unique_id: str | None = None,
    include_in_schema: bool = False,
) -> PostHandler

Create POST handler for asyncapi-try-it-plugin to publish messages to broker.

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
def make_try_it_out_handler(
    broker: "BrokerUsecase[Any, Any]",
    description: str | None = None,
    tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None,
    unique_id: str | None = None,
    include_in_schema: bool = False,
) -> "PostHandler":
    """Create POST handler for asyncapi-try-it-plugin to publish messages to broker."""
    processor = TryItOutProcessor(broker)

    @post(
        description=description,
        tags=tags,
        unique_id=unique_id,
        include_in_schema=include_in_schema,
    )
    async def try_it_out(request: Request) -> AsgiResponse:
        try:
            body: TryItOutForm = await request.json()

        except Exception as e:
            return JSONResponse({"details": f"Invalid JSON: {e}"}, 400)

        return await processor.process(body)

    return try_it_out