To declare an application-level context field, you need to call the context.set_global method with a key to indicate where the object will be placed in the context.
To set a local context (available only within the message processing scope), use the context manager scope. It could me extremely uselful to fill context with additional options in Middlewares
fromtypingimportAny,AnnotatedfromfaststreamimportContext,FastStream,BaseMiddlewarefromfaststream.kafkaimportKafkaBroker,KafkaMessagefromfaststream.typesimportAsyncFuncAnyfromfaststream.messageimportStreamMessageclassMiddleware(BaseMiddleware):asyncdefconsume_scope(self,call_next:AsyncFuncAny,msg:StreamMessage[Any],)->Any:withself.context.scope("correlation_id",msg.correlation_id):returnawaitsuper().consume_scope(call_next,msg)broker=KafkaBroker("localhost:9092",middlewares=[Middleware])app=FastStream(broker)@broker.subscriber("test-topic")asyncdefhandle(message:KafkaMessage,# get from the context toocorrelation_id:Annotated[str,Context()],):assertcorrelation_id==message.correlation_id
fromtypingimportAny,AnnotatedfromfaststreamimportContext,FastStream,BaseMiddlewarefromfaststream.confluentimportKafkaBroker,KafkaMessagefromfaststream.typesimportAsyncFuncAnyfromfaststream.messageimportStreamMessageclassMiddleware(BaseMiddleware):asyncdefconsume_scope(self,call_next:AsyncFuncAny,msg:StreamMessage[Any],)->Any:withself.context.scope("correlation_id",msg.correlation_id):returnawaitsuper().consume_scope(call_next,msg)broker=KafkaBroker("localhost:9092",middlewares=[Middleware])app=FastStream(broker)@broker.subscriber("test-topic")asyncdefhandle(message:KafkaMessage,# get from the context toocorrelation_id:Annotated[str,Context()],):assertcorrelation_id==message.correlation_id
fromtypingimportAny,AnnotatedfromfaststreamimportContext,FastStream,BaseMiddlewarefromfaststream.rabbitimportRabbitBroker,RabbitMessagefromfaststream.typesimportAsyncFuncAnyfromfaststream.messageimportStreamMessageclassMiddleware(BaseMiddleware):asyncdefconsume_scope(self,call_next:AsyncFuncAny,msg:StreamMessage[Any],)->Any:withself.context.scope("correlation_id",msg.correlation_id):returnawaitsuper().consume_scope(call_next,msg)broker=RabbitBroker("amqp://guest:guest@localhost:5672/",middlewares=[Middleware])app=FastStream(broker)@broker.subscriber("test-queue")asyncdefhandle(message:RabbitMessage,# get from the context toocorrelation_id:Annotated[str,Context()],):assertcorrelation_id==message.correlation_id
fromtypingimportAny,AnnotatedfromfaststreamimportContext,FastStream,BaseMiddlewarefromfaststream.natsimportNatsBroker,NatsMessagefromfaststream.typesimportAsyncFuncAnyfromfaststream.messageimportStreamMessageclassMiddleware(BaseMiddleware):asyncdefconsume_scope(self,call_next:AsyncFuncAny,msg:StreamMessage[Any],)->Any:withself.context.scope("correlation_id",msg.correlation_id):returnawaitsuper().consume_scope(call_next,msg)broker=NatsBroker("nats://localhost:4222",middlewares=[Middleware])app=FastStream(broker)@broker.subscriber("test-subject")asyncdefhandle(message:NatsMessage,# get from the context toocorrelation_id:Annotated[str,Context()],):assertcorrelation_id==message.correlation_id
fromtypingimportAny,AnnotatedfromfaststreamimportContext,FastStream,BaseMiddlewarefromfaststream.redisimportRedisBroker,RedisMessagefromfaststream.typesimportAsyncFuncAnyfromfaststream.messageimportStreamMessageclassMiddleware(BaseMiddleware):asyncdefconsume_scope(self,call_next:AsyncFuncAny,msg:StreamMessage[Any],)->Any:withself.context.scope("correlation_id",msg.correlation_id):returnawaitsuper().consume_scope(call_next,msg)broker=RedisBroker("redis://localhost:6379",middlewares=[Middleware])app=FastStream(broker)@broker.subscriber("test-channel")asyncdefhandle(message:RedisMessage,# get from the context toocorrelation_id:Annotated[str,Context()],):assertcorrelation_id==message.correlation_id
Also, FastStream has already created Annotated aliases to provide you with comfortable access to existing objects. You can import them directly from faststream or your broker-specific modules:
Sometimes, you may need to use a different name for the argument (not the one under which it is stored in the context) or get access to specific parts of the object. To do this, simply specify the name of what you want to access, and the context will provide you with the object.
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.kafkaimportKafkaBrokerfromfaststream.kafka.messageimportKafkaMessagebroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Annotated[KafkaMessage,Context()],# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.confluentimportKafkaBrokerfromfaststream.confluent.messageimportKafkaMessagebroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Annotated[KafkaMessage,Context()],# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.rabbitimportRabbitBrokerfromfaststream.rabbit.messageimportRabbitMessagebroker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Annotated[RabbitMessage,Context()],# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.natsimportNatsBrokerfromfaststream.nats.messageimportNatsMessagebroker=NatsBroker("nats://localhost:4222")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Annotated[NatsMessage,Context()],# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.redisimportRedisBrokerfromfaststream.redis.messageimportRedisMessagebroker=RedisBroker("redis://localhost:6379")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Annotated[RedisMessage,Context()],# get access to raw message):...