Skip to content

TryItOutProcessor

faststream.asgi.factories.TryItOutProcessor #

TryItOutProcessor(*brokers: BrokerUsecase[Any, Any])

Dispatch try-it-out requests by the exact AsyncAPI channel when possible.

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
def __init__(self, *brokers: "BrokerUsecase[Any, Any]") -> None:
    registry = _get_broker_registry()
    self._entries: list[
        tuple[BrokerUsecase[Any, Any], type[TestBroker[Any, Any]]]
    ] = []
    for broker in brokers:
        for br_cls, test_broker_cls in registry.items():
            if isinstance(broker, br_cls):
                self._entries.append((broker, test_broker_cls))
                break
        else:
            msg = f"TestBroker not available for {broker}. Please, inspect your dependencies."
            raise ValueError(msg)

    if not self._entries:
        msg = "TryItOutProcessor requires at least one broker."
        raise ValueError(msg)

process async #

process(body: TryItOutForm) -> AsgiResponse

Process parsed body: validate, dry-run or publish. Returns response.

Source code in faststream/asgi/factories/asyncapi/try_it_out.py
async def process(self, body: TryItOutForm) -> AsgiResponse:
    """Process parsed body: validate, dry-run or publish. Returns response."""
    channel = body.get("channelName", "")
    destination, *_ = channel.split(":")

    if not destination:
        return JSONResponse({"details": "Missing channelName"}, 400)

    if len(self._entries) == 1:
        broker, test_broker_cls = self._entries[0]
    else:
        entry = next(
            (e for e in self._entries if channel in _iter_broker_channels(e[0])),
            None,
        )

        if entry is None:
            entry = next(
                (
                    e
                    for e in self._entries
                    if destination in _iter_broker_destinations(e[0])
                ),
                None,
            )

        if entry is None:
            return JSONResponse(
                {"details": f"{destination} destination not found."}, 404
            )

        broker, test_broker_cls = entry
    payload: Any = body.get("message", {}).get("message")
    use_real_broker = body.get("options", {}).get("sendToRealBroker", False)

    try:
        if use_real_broker:
            await broker.publish(payload, destination)
            return JSONResponse("ok", 200)

        same_type_brokers = (
            broker,
            *(
                b
                for b, cls in self._entries
                if cls is test_broker_cls and b is not broker
            ),
        )
        # ``test_broker_cls`` is typed as the abstract ``TestBroker`` base, whose
        # overloaded ``__init__`` would make mypy try to instantiate the abstract
        # class. At runtime it is always a concrete subclass; the enter result is
        # unused here, so call it through a plain factory type.
        test_broker_factory = cast(
            "Callable[..., TestBroker[Any, Any]]", test_broker_cls
        )
        async with test_broker_factory(*same_type_brokers):
            data = await broker.request(payload, destination, timeout=30)
            decoded = None
            with suppress(Exception):
                decoded = await data.decode()
            return JSONResponse(
                decoded if decoded is not None and decoded != b"" else "ok", 200
            )

    except SubscriberNotFound:
        return JSONResponse({"details": f"{destination} destination not found."}, 404)

    except Exception as e:
        return JSONResponse({"details": repr(e)}, 500)