Process try-it-out requests: parse, validate, publish to real or test broker.
Source code in faststream/asgi/factories/asyncapi/try_it_out.py
| def __init__(self, broker: "BrokerUsecase[Any, Any]") -> None:
self._broker = broker
registry = _get_broker_registry()
for br_cls, test_broker_cls in registry.items():
if isinstance(self._broker, br_cls):
self._test_broker_cls = test_broker_cls
break
else:
msg = f"TestBroker not available for {broker}. Please, inspect your dependencies."
raise ValueError(msg)
|
process async
Process parsed body: validate, dry-run or publish. Returns response.
Source code in faststream/asgi/factories/asyncapi/try_it_out.py
| async def process(self, body: TryItOutForm) -> AsgiResponse:
"""Process parsed body: validate, dry-run or publish. Returns response."""
destination, *_ = body.get("channelName", "").split(":")
if not destination:
return JSONResponse({"details": "Missing channelName"}, 400)
message_wrapper = body.get("message", {})
payload: Any = message_wrapper.get("message")
options = body.get("options", {})
use_real_broker = options.get("sendToRealBroker", False)
try:
if use_real_broker:
await self._broker.publish(payload, destination)
return JSONResponse("ok", 200)
async with self._test_broker_cls(self._broker) as br:
data = await br.request(payload, destination)
decoded = None
with suppress(Exception):
decoded = await data.decode()
return JSONResponse(
decoded if decoded is not None and decoded != b"" else "ok", 200
)
except SubscriberNotFound:
return JSONResponse({"details": f"{destination} destination not found."}, 404)
except Exception as e:
return JSONResponse({"details": str(e)}, 500)
|