Often, you need not only to run your application to consume messages but also to make it a part of your service ecosystem with Prometheus metrics, K8S liveness and readiness probes, traces, and other observability features.
Unfortunately, such functionality can't be implemented by broker features alone, and you have to provide several HTTP endpoints in your app.
Of course, you can use FastStream as a part of any ASGI frameworks (integrations), but fewer the dependencies, the better, right?
Fortunately, we have built-in ASGI support. It is very limited but good enough to provide you with basic functionality for metrics and healthcheck endpoint implementation.
AsgiFastStream is able to call any ASGI-compatible callable objects, so you can use any endpoints from other libraries if they are compatible with the protocol.
If you want to write your own simple HTTP-endpoint, you can use our @get or @post decorator as in the following example.
This possibility built on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI. We send all args directly to gunicorn, you can learn more about it here.
Moreover, our wrappers can be used as ready-to-use endpoints for other ASGI frameworks. This can be very helpful When you are running FastStream in the same runtime as any other ASGI frameworks.
fromcontextlibimportasynccontextmanagerfromfastapiimportFastAPIfromfaststream.natsimportNatsBrokerfromfaststream.specificationimportAsyncAPIfromfaststream.asgiimportmake_ping_asgi,make_asyncapi_asgibroker=NatsBroker()asyncapi=AsyncAPI(broker)@asynccontextmanagerasyncdefstart_broker(app):"""Start the broker with the app."""asyncwithbroker:awaitbroker.start()yieldapp=FastAPI(lifespan=start_broker)app.mount("/health",make_ping_asgi(broker,timeout=5.0))app.mount("/asyncapi",make_asyncapi_asgi(asyncapi))