Skip to content

Lifespan Hooks#

In FastStream, lifespan hooks are special functions executed at specific stages of the application’s lifecycle. They allow you to:

  • Initialize resources before the broker starts (configuration, databases)
  • Perform actions after the broker has started (send initial messages, warm up caches)
  • Gracefully shut down before the broker stops (save data, close connections)
  • Clean up resources after the application has fully stopped

There are four types of hooks:

  1. on_startup — before the broker is connected
  2. after_startup — after the broker is connected
  3. on_shutdown — before the broker is disconnected
  4. after_shutdown — after the broker is disconnected

Resource availability by hooks#

The table below summarizes what is available in each of the hooks at different stages of the broker's life.

Hook CLI args Context Broker life
on_startup
after_startup
on_shutdown
after_shutdown

Call Order#

Lifespan hooks are called in a strict order following the application’s lifecycle, as shown in the table above: first on_startup, then after_startup, followed by on_shutdown, and finally after_shutdown.

You can define multiple functions for a single hook — they will be executed in the order they were registered. The order in which different hooks are declared does not affect their execution: FastStream guarantees that hooks are called according to the lifecycle sequence, regardless of registration order.

import logging

logger = logging.getLogger("faststream")


@app.on_startup
async def startup_hook():
    logger.info("on_startup called")

@app.after_startup
async def after_startup_hook():
    logger.info("after_startup called")

@app.on_shutdown
async def shutdown_hook():
    logger.info("on_shutdown called")

@app.after_shutdown
async def after_shutdown_hook():
    logger.info("after_shutdown called")

Console output:

on_startup called
after_startup called
on_shutdown called
after_shutdown called

This allows you to safely separate logic and resource initialization across different functions and hooks without worrying about the order of registration.

You can also specify multiple hooks. All your registered hooks will be added to a list and executed.

Usage example#

Let's imagine that your application uses pydantic as your settings manager.

I highly recommend using pydantic for these purposes, because this dependency is already used at FastStream and you don't have to install an additional package

Also, let's imagine that you have several .env, .env.development, .env.test, .env.production files with your application settings, and you want to switch them at startup without any code changes.

By passing optional arguments with the command line to your code FastStream allows you to do this easily.

Lifespan#

Let's write some code for our example

from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from pydantic_settings import BaseSettings

broker = KafkaBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.confluent import KafkaBroker
from pydantic_settings import BaseSettings

broker = KafkaBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.rabbit import RabbitBroker
from pydantic_settings import BaseSettings

broker = RabbitBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.nats import NatsBroker
from pydantic_settings import BaseSettings

broker = NatsBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.redis import RedisBroker
from pydantic_settings import BaseSettings

broker = RedisBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

Now this application can be run using the following command to manage the environment:

faststream run serve:app --env .env.test

Details#

Now let's look into a little more detail.

To begin with, we are using a @app.on_startup decorator

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

to declare a function that runs when our application starts.

The next step is to declare our function parameters that we expect to receive:

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

The env argument will be passed to the setup function from the user-provided command line arguments.

Tip

All lifecycle functions always apply @apply_types decorator, therefore, all context fields and dependencies are available in them

Then, we initialize the settings of our application using the file passed to us from the command line:

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

And put these settings in a global context:

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)
Note

Now we can access our settings anywhere in the application right from the context

from faststream import Context, apply_types

@apply_types
async def func(settings = Context()): ...

As the last step we initialize our broker: now, when the application starts, it will be ready to receive messages:

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
    await broker.connect(settings.host)

Another example#

Now let's imagine that we have a machine learning model that needs to process messages from some broker.

Initialization of such models usually takes a long time. It would be wise to do this at the start of the application, and not when processing each message.

You can initialize your model somewhere at the top of your module/file. However, in this case, this code will be run even just in case of importing this module, for example, during testing.

Therefore, it is worth initializing the model in the @app.on_startup hook.

Also, we don't want the model to finish its work incorrectly when the application is stopped. To avoid this, we need to also define the @app.on_shutdown hook:

from faststream import Context, ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

ml_models = {}  # fake ML model


def fake_answer_to_everything_ml_model(x: float) -> float:
    return x * 42


@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)


@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()


@broker.subscriber("test")
async def predict(x: float, model: dict = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
from faststream import Context, ContextRepo, FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

ml_models = {}  # fake ML model


def fake_answer_to_everything_ml_model(x: float) -> float:
    return x * 42


@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)


@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()


@broker.subscriber("test")
async def predict(x: float, model: dict = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
from faststream import Context, ContextRepo, FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

ml_models = {}  # fake ML model


def fake_answer_to_everything_ml_model(x: float) -> float:
    return x * 42


@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)


@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()


@broker.subscriber("test")
async def predict(x: float, model: dict = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
from faststream import Context, ContextRepo, FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

ml_models = {}  # fake ML model


def fake_answer_to_everything_ml_model(x: float) -> float:
    return x * 42


@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)


@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()


@broker.subscriber("test")
async def predict(x: float, model: dict = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}
from faststream import Context, ContextRepo, FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

ml_models = {}  # fake ML model


def fake_answer_to_everything_ml_model(x: float) -> float:
    return x * 42


@app.on_startup
async def setup_model(context: ContextRepo):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    context.set_global("model", ml_models)


@app.on_shutdown
async def shutdown_model(model: dict = Context()):
    # Clean up the ML models and release the resources
    model.clear()


@broker.subscriber("test")
async def predict(x: float, model: dict = Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}

Multiple hooks#

If you want to declare multiple lifecycle hooks, they will be used in the order they are registered:

from unittest.mock import AsyncMock, MagicMock

from faststream import Context, ContextRepo, FastStream

app = FastStream()


@app.on_startup
async def setup(context: ContextRepo):
    context.set_global("field", 1)


@app.on_startup
async def setup_later(field: int = Context()):
    assert field == 1

Some more details#

Async or not async#

In the asynchronous version of the application, both asynchronous and synchronous methods can be used as hooks. In the synchronous version, only synchronous methods are available.

Command line arguments#

Command line arguments are available in all @app.on_startup hooks. To use them in other parts of the application, put them in the ContextRepo.

Broker initialization#

The @app.on_startup hooks are called BEFORE the broker is launched by the application. The @app.after_shutdown hooks are triggered AFTER stopping the broker.

If you want to perform some actions AFTER initializing the broker: send messages, initialize objects, etc., you should use the @app.after_startup hook.