Skip to content

TryItOutProcessor

faststream.asgi.factories.asyncapi.TryItOutProcessor #

TryItOutProcessor(broker: BrokerUsecase[Any, Any])

Process try-it-out requests: parse, validate, publish to real or test broker.

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
def __init__(self, broker: "BrokerUsecase[Any, Any]") -> None:
    self._broker = broker

    registry = _get_broker_registry()
    for br_cls, test_broker_cls in registry.items():
        if isinstance(self._broker, br_cls):
            self._test_broker_cls = test_broker_cls
            break

    else:
        msg = f"TestBroker not available for {broker}. Please, inspect your dependencies."
        raise ValueError(msg)

process async #

process(body: TryItOutForm) -> AsgiResponse

Process parsed body: validate, dry-run or publish. Returns response.

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
async def process(self, body: TryItOutForm) -> AsgiResponse:
    """Process parsed body: validate, dry-run or publish. Returns response."""
    destination, *_ = body.get("channelName", "").split(":")

    if not destination:
        return JSONResponse({"details": "Missing channelName"}, 400)

    message_wrapper = body.get("message", {})
    payload: Any = message_wrapper.get("message")
    options = body.get("options", {})
    use_real_broker = options.get("sendToRealBroker", False)

    try:
        if use_real_broker:
            await self._broker.publish(payload, destination)
            return JSONResponse("ok", 200)

        async with self._test_broker_cls(self._broker) as br:
            data = await br.request(payload, destination)
            decoded = None
            with suppress(Exception):
                decoded = await data.decode()
            return JSONResponse(
                decoded if decoded is not None and decoded != b"" else "ok", 200
            )

    except SubscriberNotFound:
        return JSONResponse({"details": f"{destination} destination not found."}, 404)

    except Exception as e:
        return JSONResponse({"details": str(e)}, 500)