Skip to content

Context Fields Declaration#

You can also store your own objects in the Context.

Global#

To declare an application-level context field, you need to call the context.set_global method with a key to indicate where the object will be placed in the context.

1
2
3
4
5
6
7
8
9
from typing import Annotated
from faststream import FastStream, ContextRepo, Context
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
1
2
3
4
5
6
7
8
9
from typing import Annotated
from faststream import FastStream, ContextRepo, Context
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")

@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
1
2
3
4
5
6
7
8
9
from typing import Annotated
from faststream import FastStream, ContextRepo, Context
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
1
2
3
4
5
6
7
8
9
from typing import Annotated
from faststream import FastStream, ContextRepo, Context
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")

@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
1
2
3
4
5
6
7
8
9
from typing import Annotated
from faststream import FastStream, ContextRepo, Context
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")

@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")

Afterward, you can access your secret field in the usual way:

1
2
3
4
5
@broker.subscriber("test-topic")
async def handle(
    secret_str: Annotated[str, Context()],
):
    assert secret_str == "my-perfect-secret"
1
2
3
4
5
@broker.subscriber("test-topic")
async def handle(
    secret_str: Annotated[str, Context()],
):
    assert secret_str == "my-perfect-secret"
1
2
3
4
5
@broker.subscriber("test-queue")
async def handle(
    secret_str: Annotated[str, Context()],
):
    assert secret_str == "my-perfect-secret"
1
2
3
4
5
@broker.subscriber("test-subject")
async def handle(
    secret_str: Annotated[str, Context()],
):
    assert secret_str == "my-perfect-secret"
1
2
3
4
5
@broker.subscriber("test-channel")
async def handle(
    secret_str: Annotated[str, Context()],
):
    assert secret_str == "my-perfect-secret"

In this case, the field becomes a global context field: it does not depend on the current message handler (unlike message)

Tip

Alternatively you can setup global context objects in FastStream object constructor:

from faststream import FastStream, ContextRepo

app = FastStream(context=ContextRepo({
    "secret_str": "my-perfect-secret"
}))

To remove a field from the context use the reset_global method:

context.reset_global("my_key")

Local#

To set a local context (available only within the message processing scope), use the context manager scope. It could me extremely uselful to fill context with additional options in Middlewares

from typing import Any, Annotated
from faststream import Context, FastStream, BaseMiddleware
from faststream.kafka import KafkaBroker, KafkaMessage
from faststream.types import AsyncFuncAny
from faststream.message import StreamMessage

class Middleware(BaseMiddleware):
    async def consume_scope(
        self,
        call_next: AsyncFuncAny,
        msg: StreamMessage[Any],
    ) -> Any:
        with self.context.scope("correlation_id", msg.correlation_id):
            return await super().consume_scope(call_next, msg)

broker = KafkaBroker("localhost:9092", middlewares=[Middleware])
app = FastStream(broker)

@broker.subscriber("test-topic")
async def handle(
    message: KafkaMessage,  # get from the context too
    correlation_id: Annotated[str, Context()],
):
    assert correlation_id == message.correlation_id
from typing import Any, Annotated
from faststream import Context, FastStream, BaseMiddleware
from faststream.confluent import KafkaBroker, KafkaMessage
from faststream.types import AsyncFuncAny
from faststream.message import StreamMessage

class Middleware(BaseMiddleware):
    async def consume_scope(
        self,
        call_next: AsyncFuncAny,
        msg: StreamMessage[Any],
    ) -> Any:
        with self.context.scope("correlation_id", msg.correlation_id):
            return await super().consume_scope(call_next, msg)

broker = KafkaBroker("localhost:9092", middlewares=[Middleware])
app = FastStream(broker)

@broker.subscriber("test-topic")
async def handle(
    message: KafkaMessage,  # get from the context too
    correlation_id: Annotated[str, Context()],
):
    assert correlation_id == message.correlation_id
from typing import Any, Annotated
from faststream import Context, FastStream, BaseMiddleware
from faststream.rabbit import RabbitBroker, RabbitMessage
from faststream.types import AsyncFuncAny
from faststream.message import StreamMessage

class Middleware(BaseMiddleware):
    async def consume_scope(
        self,
        call_next: AsyncFuncAny,
        msg: StreamMessage[Any],
    ) -> Any:
        with self.context.scope("correlation_id", msg.correlation_id):
            return await super().consume_scope(call_next, msg)

broker = RabbitBroker("amqp://guest:guest@localhost:5672/", middlewares=[Middleware])
app = FastStream(broker)

@broker.subscriber("test-queue")
async def handle(
    message: RabbitMessage,  # get from the context too
    correlation_id: Annotated[str, Context()],
):
    assert correlation_id == message.correlation_id
from typing import Any, Annotated
from faststream import Context, FastStream, BaseMiddleware
from faststream.nats import NatsBroker, NatsMessage
from faststream.types import AsyncFuncAny
from faststream.message import StreamMessage

class Middleware(BaseMiddleware):
    async def consume_scope(
        self,
        call_next: AsyncFuncAny,
        msg: StreamMessage[Any],
    ) -> Any:
        with self.context.scope("correlation_id", msg.correlation_id):
            return await super().consume_scope(call_next, msg)

broker = NatsBroker("nats://localhost:4222", middlewares=[Middleware])
app = FastStream(broker)

@broker.subscriber("test-subject")
async def handle(
    message: NatsMessage,  # get from the context too
    correlation_id: Annotated[str, Context()],
):
    assert correlation_id == message.correlation_id
from typing import Any, Annotated
from faststream import Context, FastStream, BaseMiddleware
from faststream.redis import RedisBroker, RedisMessage
from faststream.types import AsyncFuncAny
from faststream.message import StreamMessage

class Middleware(BaseMiddleware):
    async def consume_scope(
        self,
        call_next: AsyncFuncAny,
        msg: StreamMessage[Any],
    ) -> Any:
        with self.context.scope("correlation_id", msg.correlation_id):
            return await super().consume_scope(call_next, msg)

broker = RedisBroker("redis://localhost:6379", middlewares=[Middleware])
app = FastStream(broker)

@broker.subscriber("test-channel")
async def handle(
    message: RedisMessage,  # get from the context too
    correlation_id: Annotated[str, Context()],
):
    assert correlation_id == message.correlation_id

Existing Fields#

Context already contains some global objects that you can always access:

  • broker - the current broker
  • context - the context itself, in which you can write your own fields
  • logger - the logger used for your broker (tags messages with message_id)
  • message - the raw message (if you need access to it)

At the same time, thanks to contextlib.ContextVar, message is local for you current consumer scope.

Access to Context Fields#

By default, the context searches for an object based on the argument name.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

async def handle(
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

@broker_object.subscriber("response-topic")
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

async def handle(
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

@broker_object.subscriber("response-topic")
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker

broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)

async def handle(
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

@broker_object.subscriber("response-queue")
from faststream import Context, FastStream
from faststream.nats import NatsBroker

broker_object = NatsBroker("nats://localhost:4222")
app = FastStream(broker_object)

async def handle(
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

@broker_object.subscriber("response-subject")
from faststream import Context, FastStream
from faststream.redis import RedisBroker

broker_object = RedisBroker("redis://localhost:6379")
app = FastStream(broker_object)

async def handle(
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

@broker_object.subscriber("response-channel")

Annotated Aliases#

Also, FastStream has already created Annotated aliases to provide you with comfortable access to existing objects. You can import them directly from faststream or your broker-specific modules:

  • Shared aliases
from faststream import Logger, ContextRepo
from faststream.kafka.annotations import (
    Logger, ContextRepo, KafkaMessage,
    KafkaBroker, KafkaProducer, NoCast,
)

faststream.kafka.KafkaMessage is an alias to faststream.kafka.annotations.KafkaMessage

from faststream.kafka import KafkaMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    ContextRepo,
    KafkaMessage,
    Logger,
    KafkaBroker as BrokerAnnotation,
)

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("response-topic")
async def handle_response(
    logger: Logger,
    message: KafkaMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.confluent.annotations import (
    Logger, ContextRepo, KafkaMessage,
    KafkaBroker, KafkaProducer, NoCast,
)

faststream.confluent.KafkaMessage is an alias to faststream.confluent.annotations.KafkaMessage

from faststream.confluent import KafkaMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import (
    ContextRepo,
    KafkaMessage,
    Logger,
    KafkaBroker as BrokerAnnotation,
)

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("response-topic")
async def handle_response(
    logger: Logger,
    message: KafkaMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.rabbit.annotations import (
    Logger, ContextRepo, RabbitMessage,
    RabbitBroker, RabbitProducer, NoCast,
)

faststream.rabbit.RabbitMessage is an alias to faststream.rabbit.annotations.RabbitMessage

from faststream.rabbit import RabbitMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
    ContextRepo,
    RabbitMessage,
    Logger,
    RabbitBroker as BrokerAnnotation,
)

broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)

@broker_object.subscriber("response-queue")
async def handle_response(
    logger: Logger,
    message: RabbitMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.nats.annotations import (
    Logger, ContextRepo, NatsMessage,
    NatsBroker, NatsProducer, NatsJsProducer,
    Client, JsClient, NoCast,
)

faststream.nats.NatsMessage is an alias to faststream.nats.annotations.NatsMessage

from faststream.nats import NatsMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import (
    ContextRepo,
    NatsMessage,
    Logger,
    NatsBroker as BrokerAnnotation,
)

broker_object = NatsBroker("nats://localhost:4222")
app = FastStream(broker_object)

@broker_object.subscriber("response-subject")
async def handle_response(
    logger: Logger,
    message: NatsMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.redis.annotations import (
    Logger, ContextRepo, RedisMessage,
    RedisBroker, Redis, NoCast,
)

faststream.redis.RedisMessage is an alias to faststream.redis.annotations.RedisMessage

from faststream.redis import RedisMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.annotations import (
    ContextRepo,
    RedisMessage,
    Logger,
    RedisBroker as BrokerAnnotation,
)

broker_object = RedisBroker("redis://localhost:6379")
app = FastStream(broker_object)

@broker_object.subscriber("response-channel")
async def handle_response(
    logger: Logger,
    message: RedisMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")

Context Extra Options#

Additionally, Context provides you with some extra capabilities for working with containing objects.

Default Values#

For instance, if you attempt to access a field that doesn't exist in the global context, you will receive a pydantic.ValidationError exception.

However, you can set default values if needed.

1
2
3
4
@broker.subscriber("test-topic")
async def handle(
    not_existed: Annotated[None, Context("not_existed", default=None)],
):
1
2
3
4
@broker.subscriber("test-topic")
async def handle(
    not_existed: Annotated[None, Context("not_existed", default=None)],
):
1
2
3
4
@broker.subscriber("test-queue")
async def handle(
    not_existed: Annotated[None, Context("not_existed", default=None)],
):
1
2
3
4
@broker.subscriber("test-subject")
async def handle(
    not_existed: Annotated[None, Context("not_existed", default=None)],
):
1
2
3
4
@broker.subscriber("test-channel")
async def handle(
    not_existed: Annotated[None, Context("not_existed", default=None)],
):

Cast Context Types#

By default, context fields are NOT CAST to the type specified in their annotation.

from typing import Annotated
from faststream import Context, FastStream, ContextRepo
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker, context=ContextRepo({
    "secret": "1"
}))

@broker.subscriber("test-topic")
async def handle(
    secret: Annotated[int, Context()],
):
    assert secret == "1"
from typing import Annotated
from faststream import Context, FastStream, ContextRepo
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker, context=ContextRepo({
    "secret": "1"
}))

@broker.subscriber("test-topic")
async def handle(
    secret: Annotated[int, Context()],
):
    assert secret == "1"
from typing import Annotated
from faststream import Context, FastStream, ContextRepo
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker, context=ContextRepo({
    "secret": "1"
}))

@broker.subscriber("test-queue")
async def handle(
    secret: Annotated[int, Context()],
):
    assert secret == "1"
from typing import Annotated
from faststream import Context, FastStream, ContextRepo
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker, context=ContextRepo({
    "secret": "1"
}))

@broker.subscriber("test-subject")
async def handle(
    secret: Annotated[int, Context()],
):
    assert secret == "1"
from typing import Annotated
from faststream import Context, FastStream, ContextRepo
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker, context=ContextRepo({
    "secret": "1"
}))

@broker.subscriber("test-channel")
async def handle(
    secret: Annotated[int, Context()],
):
    assert secret == "1"

If you require this functionality, you can enable the appropriate flag.

1
2
3
4
@broker.subscriber("test-topic2")
async def handle_int(
    secret: Annotated[int, Context(cast=True)],
):
1
2
3
4
@broker.subscriber("test-topic2")
async def handle_int(
    secret: Annotated[int, Context(cast=True)],
):
1
2
3
4
@broker.subscriber("test-queue2")
async def handle_int(
    secret: Annotated[int, Context(cast=True)],
):
1
2
3
4
@broker.subscriber("test-subject2")
async def handle_int(
    secret: Annotated[int, Context(cast=True)],
):
1
2
3
4
@broker.subscriber("test-channel2")
async def handle_int(
    secret: Annotated[int, Context(cast=True)],
):

Initial Value#

Also, Context provides you with a initial option to setup base context value without previous set_global call.

1
2
3
4
5
6
@broker.subscriber("test-topic")
async def handle(
    msg: str,
    collector: Annotated[list[str], Context(initial=list)],
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-topic")
async def handle(
    msg: str,
    collector: Annotated[list[str], Context(initial=list)],
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-queue")
async def handle(
    msg: str,
    collector: Annotated[list[str], Context(initial=list)],
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-subject")
async def handle(
    msg: str,
    collector: Annotated[list[str], Context(initial=list)],
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-channel")
async def handle(
    msg: str,
    collector: Annotated[list[str], Context(initial=list)],
):
    collector.append(msg)

Access by Name#

Sometimes, you may need to use a different name for the argument (not the one under which it is stored in the context) or get access to specific parts of the object. To do this, simply specify the name of what you want to access, and the context will provide you with the object.

from typing import Annotated
from faststream import Context, FastStream
from faststream.kafka import KafkaBroker, KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test-topic")
async def handle(
    msg: KafkaMessage,
    correlation_id: Annotated[str, Context("message.correlation_id")],
    user_header: Annotated[str, Context("message.headers.user")],
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header
from typing import Annotated
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker, KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test-topic")
async def handle(
    msg: KafkaMessage,
    correlation_id: Annotated[str, Context("message.correlation_id")],
    user_header: Annotated[str, Context("message.headers.user")],
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header
from typing import Annotated
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker, RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("test-queue")
async def handle(
    msg: RabbitMessage,
    correlation_id: Annotated[str, Context("message.correlation_id")],
    user_header: Annotated[str, Context("message.headers.user")],
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header
from typing import Annotated
from faststream import Context, FastStream
from faststream.nats import NatsBroker, NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber("test-subject")
async def handle(
    msg: NatsMessage,
    correlation_id: Annotated[str, Context("message.correlation_id")],
    user_header: Annotated[str, Context("message.headers.user")],
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header
from typing import Annotated
from faststream import Context, FastStream
from faststream.redis import RedisBroker, RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

@broker.subscriber("test-channel")
async def handle(
    msg: RedisMessage,
    correlation_id: Annotated[str, Context("message.correlation_id")],
    user_header: Annotated[str, Context("message.headers.user")],
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object specific field

    correlation_id: Annotated[str, Context("message.correlation_id")],

Or even to a dict key

    user_header: Annotated[str, Context("message.headers.user")],

Application Context#

FastStreams has its own Dependency Injection container - Context, used to store application runtime objects and variables.

With this container, you can access both application scope and message processing scope objects. This functionality is similar to Depends usage.

from typing import Annotated
from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Annotated[KafkaMessage, Context()],  # get access to raw message
):
    ...
from typing import Annotated
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Annotated[KafkaMessage, Context()],  # get access to raw message
):
    ...
from typing import Annotated
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Annotated[RabbitMessage, Context()],  # get access to raw message
):
    ...
from typing import Annotated
from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Annotated[NatsMessage, Context()],  # get access to raw message
):
    ...
from typing import Annotated
from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Annotated[RedisMessage, Context()],  # get access to raw message
):
    ...

Usages#

By default, the context is available in the same place as Depends:

  • at lifespan hooks
  • message subscribers
  • nested dependencies

Tip

You can get access to the Context in Middlewares as self.context