Lifespan hooks are called in a strict order following the application’s lifecycle, as shown in the table above: first on_startup, then after_startup, followed by on_shutdown, and finally after_shutdown.
You can define multiple functions for a single hook — they will be executed in the order they were registered. The order in which different hooks are declared does not affect their execution: FastStream guarantees that hooks are called according to the lifecycle sequence, regardless of registration order.
on_startup called
after_startup called
on_shutdown called
after_shutdown called
This allows you to safely separate logic and resource initialization across different functions and hooks without worrying about the order of registration.
You can also specify multiple hooks. All your registered hooks will be added to a list and executed.
Let's imagine that your application uses pydantic as your settings manager.
I highly recommend using pydantic for these purposes, because this dependency is already used at FastStream
and you don't have to install an additional package
Also, let's imagine that you have several .env, .env.development, .env.test, .env.production files with your application settings,
and you want to switch them at startup without any code changes.
Now let's imagine that we have a machine learning model that needs to process messages from some broker.
Initialization of such models usually takes a long time. It would be wise to do this at the start of the application, and not when processing each message.
You can initialize your model somewhere at the top of your module/file. However, in this case, this code will be run even just in case of importing
this module, for example, during testing.
Therefore, it is worth initializing the model in the @app.on_startup hook.
Also, we don't want the model to finish its work incorrectly when the application is stopped. To avoid this, we need to also define the @app.on_shutdown hook:
fromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)ml_models={}# fake ML modeldeffake_answer_to_everything_ml_model(x:float)->float:returnx*42@app.on_startupasyncdefsetup_model(context:ContextRepo):# Load the ML modelml_models["answer_to_everything"]=fake_answer_to_everything_ml_modelcontext.set_global("model",ml_models)@app.on_shutdownasyncdefshutdown_model(model:dict=Context()):# Clean up the ML models and release the resourcesmodel.clear()@broker.subscriber("test")asyncdefpredict(x:float,model:dict=Context()):result=model["answer_to_everything"](x)return{"result":result}
fromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.confluentimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)ml_models={}# fake ML modeldeffake_answer_to_everything_ml_model(x:float)->float:returnx*42@app.on_startupasyncdefsetup_model(context:ContextRepo):# Load the ML modelml_models["answer_to_everything"]=fake_answer_to_everything_ml_modelcontext.set_global("model",ml_models)@app.on_shutdownasyncdefshutdown_model(model:dict=Context()):# Clean up the ML models and release the resourcesmodel.clear()@broker.subscriber("test")asyncdefpredict(x:float,model:dict=Context()):result=model["answer_to_everything"](x)return{"result":result}
fromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.rabbitimportRabbitBrokerbroker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)ml_models={}# fake ML modeldeffake_answer_to_everything_ml_model(x:float)->float:returnx*42@app.on_startupasyncdefsetup_model(context:ContextRepo):# Load the ML modelml_models["answer_to_everything"]=fake_answer_to_everything_ml_modelcontext.set_global("model",ml_models)@app.on_shutdownasyncdefshutdown_model(model:dict=Context()):# Clean up the ML models and release the resourcesmodel.clear()@broker.subscriber("test")asyncdefpredict(x:float,model:dict=Context()):result=model["answer_to_everything"](x)return{"result":result}
fromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker("nats://localhost:4222")app=FastStream(broker)ml_models={}# fake ML modeldeffake_answer_to_everything_ml_model(x:float)->float:returnx*42@app.on_startupasyncdefsetup_model(context:ContextRepo):# Load the ML modelml_models["answer_to_everything"]=fake_answer_to_everything_ml_modelcontext.set_global("model",ml_models)@app.on_shutdownasyncdefshutdown_model(model:dict=Context()):# Clean up the ML models and release the resourcesmodel.clear()@broker.subscriber("test")asyncdefpredict(x:float,model:dict=Context()):result=model["answer_to_everything"](x)return{"result":result}
fromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.redisimportRedisBrokerbroker=RedisBroker("redis://localhost:6379")app=FastStream(broker)ml_models={}# fake ML modeldeffake_answer_to_everything_ml_model(x:float)->float:returnx*42@app.on_startupasyncdefsetup_model(context:ContextRepo):# Load the ML modelml_models["answer_to_everything"]=fake_answer_to_everything_ml_modelcontext.set_global("model",ml_models)@app.on_shutdownasyncdefshutdown_model(model:dict=Context()):# Clean up the ML models and release the resourcesmodel.clear()@broker.subscriber("test")asyncdefpredict(x:float,model:dict=Context()):result=model["answer_to_everything"](x)return{"result":result}
In the asynchronous version of the application, both asynchronous and synchronous methods can be used as hooks.
In the synchronous version, only synchronous methods are available.
The @app.on_startup hooks are called BEFORE the broker is launched by the application. The @app.after_shutdown hooks are triggered AFTER stopping the broker.
If you want to perform some actions AFTER initializing the broker: send messages, initialize objects, etc., you should use the @app.after_startup hook.