async def process(self, body: TryItOutForm) -> AsgiResponse:
"""Process parsed body: validate, dry-run or publish. Returns response."""
channel = body.get("channelName", "")
destination, *_ = channel.split(":")
if not destination:
return JSONResponse({"details": "Missing channelName"}, 400)
if len(self._entries) == 1:
broker, test_broker_cls = self._entries[0]
else:
entry = next(
(e for e in self._entries if channel in _iter_broker_channels(e[0])),
None,
)
if entry is None:
entry = next(
(
e
for e in self._entries
if destination in _iter_broker_destinations(e[0])
),
None,
)
if entry is None:
return JSONResponse(
{"details": f"{destination} destination not found."}, 404
)
broker, test_broker_cls = entry
payload: Any = body.get("message", {}).get("message")
use_real_broker = body.get("options", {}).get("sendToRealBroker", False)
try:
if use_real_broker:
await broker.publish(payload, destination)
return JSONResponse("ok", 200)
same_type_brokers = (
broker,
*(
b
for b, cls in self._entries
if cls is test_broker_cls and b is not broker
),
)
# ``test_broker_cls`` is typed as the abstract ``TestBroker`` base, whose
# overloaded ``__init__`` would make mypy try to instantiate the abstract
# class. At runtime it is always a concrete subclass; the enter result is
# unused here, so call it through a plain factory type.
test_broker_factory = cast(
"Callable[..., TestBroker[Any, Any]]", test_broker_cls
)
async with test_broker_factory(*same_type_brokers):
data = await broker.request(payload, destination, timeout=30)
decoded = None
with suppress(Exception):
decoded = await data.decode()
return JSONResponse(
decoded if decoded is not None and decoded != b"" else "ok", 200
)
except SubscriberNotFound:
return JSONResponse({"details": f"{destination} destination not found."}, 404)
except Exception as e:
return JSONResponse({"details": repr(e)}, 500)