Skip to content

make_try_it_out_handler

faststream.asgi.factories.make_try_it_out_handler #

make_try_it_out_handler(
    brokers: Iterable[BrokerUsecase[Any, Any]],
    description: str | None = None,
    tags: Sequence[Union[Tag, TagDict, dict[str, Any]]]
    | None = None,
    unique_id: str | None = None,
    include_in_schema: bool = False,
) -> PostHandler

POST handler for asyncapi-try-it-plugin (first owner of the channel wins).

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
def make_try_it_out_handler(
    brokers: Iterable["BrokerUsecase[Any, Any]"],
    description: str | None = None,
    tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None,
    unique_id: str | None = None,
    include_in_schema: bool = False,
) -> "PostHandler":
    """POST handler for asyncapi-try-it-plugin (first owner of the channel wins)."""
    processor = TryItOutProcessor(*brokers)

    @post(
        description=description,
        tags=tags,
        unique_id=unique_id,
        include_in_schema=include_in_schema,
    )
    async def try_it_out(request: Request) -> AsgiResponse:
        try:
            body: TryItOutForm = await request.json()

        except Exception as e:
            return JSONResponse({"details": f"Invalid JSON: {e}"}, 400)

        return await processor.process(body)

    return try_it_out