Middlewares in FastStream allow you to process messages before and after they are handled by your code.
This allows you to add common functionality to multiple handlers without duplicating code.
Middlewares help keep your business logic separate from the technical aspects of your application.
In this section, you will find a list of available middlewares and detailed information about how they work. You can also learn how to create your own middleware.
It is important to mention the parser, filter, decoder and publish - they are service functions, FastStream uses them during event processing. More details below:
on_receive - This method is called first for every incoming message, regardless of whether the message will be processed.
parser - Converts native broker messages (aiopika, aiokafka, redis, etc.) into FastStream's StreamMessage format
filter - Applies filtering logic based on user-defined filter parameters.
consume_scope - If the filter passes, the flow continues to the handler. otherwise, the event will be passed to another handler.
decoder - Deserializes message bytes into dictionaries or structured data.
Handler - Executes the message handling function
publish_scope - This method is called for every outgoing message, which includes messages sent via @publisher decorators, direct calls to broker.publish() or broker.request(), and any replies.
publish - The publish_scope calls the publish method, and the result of consume_scope will be used as the argument for sending the message.
after_processed - Final cleanup and post-processing stage.
fromtypesimportTracebackTypefromfaststreamimportBaseMiddlewareclassMyMiddleware(BaseMiddleware):asyncdefon_receive(self)->None:# All events are included here, without any other side effects.print(f"Received: {self.msg}")returnawaitsuper().on_receive()asyncdefafter_processed(self,exc_type:type[BaseException]|None=None,exc_val:BaseException|None=None,exc_tb:TracebackType|None=None,)->bool|None:ifexc_type:# Catch the error if it occurred in your handler...returnawaitsuper().after_processed(exc_type,exc_val,exc_tb)# You can register them to the broker or router scopes.broker=Broker(middlewares=[MyMiddleware])# global scope# Orrouter=BrokerRouter(middlewares=[MyMiddleware])# router scope
Middlewares can be used Broker scope or Router scope.
fromtypesimportTracebackTypefromtypingimportAny,Awaitable,CallablefromfaststreamimportBaseMiddleware,PublishCommand,StreamMessageclassMyMiddleware(BaseMiddleware):# Use this if you want to add logic when a message is received for the first time,# such as logging incoming messages, validating headers, or setting up the context.asyncdefon_receive(self)->Any:print(f"Received: {self.msg}")returnawaitsuper().on_receive()# Use this if you want to wrap the entire message processing process,# such as implementing retry logic, circuit breakers, rate limiting, or authentication.asyncdefconsume_scope(self,call_next:Callable[[StreamMessage[Any]],Awaitable[Any]],msg:StreamMessage[Any],)->Any:returnawaitcall_next(msg)# Use this if you want to customize outgoing messages before they are sent,# such as adding encryption, compression, or custom headers.asyncdefpublish_scope(self,call_next:Callable[[PublishCommand],Awaitable[Any]],cmd:PublishCommand,)->Any:returnawaitcall_next(cmd)# Use this if you want to perform post-processing tasks after message handling has completed,# such as cleaning up, logging errors, collecting metrics, or committing transactions.asyncdefafter_processed(self,exc_type:type[BaseException]|None=None,exc_val:BaseException|None=None,exc_tb:TracebackType|None=None,)->bool|None:returnawaitsuper().after_processed(exc_type,exc_val,exc_tb)
PayAttention to the order: the methods are executed in this sequence after each stage. Read more below in Middlewares Flow.
If you want to intercept the publishing process, you will need to use the publish_scope method. This method consumes the message body and any other options passed to the publish method (such as destination headers, etc.). So, you can patch them any kind you want.
publish_scope affect all ways of publishing something, including the broker.publish(...) call and reply-to / RPC replies.
To differentiate between different types of publishers, you can use cmd.publish_type. It can be one of the following Enum:
When you publish multiple messages at once using the broker.publish_batch(...) method, the publish_scope receives a BatchPublishCommand object. This object holds all the messages to be sent in its cmd.batch_bodies attribute. This feature is useful for intercepting and modifying the batch publication process.
✨ If the basic PublishCommand does not meet your needs, you can use the extended option. Here is an example:
fromtypingimportAny,Awaitable,CallablefromfaststreamimportBaseMiddleware,BatchPublishCommandclassBatchPublishMiddleware(BaseMiddleware[BatchPublishCommand]):asyncdefpublish_scope(self,call_next:Callable[[BatchPublishCommand],Awaitable[Any]],cmd:BatchPublishCommand,)->Any:# you can access `cmd.batch_bodies` herereturnawaitcall_next(cmd)
Middlewares in FastStream offer a powerful mechanism to hook into the message processing lifecycle. Key points to remember:
Order of execution matters - Methods are called in a specific sequence: on_receive → parser → filter → consume_scope → decoder → handler → publish_scope → publish → after_processed.
Comprehensive Publishing Hook: The publish_scope method intercepts all outgoing messages, regardless of whether they are from a @publisher decorator, a direct broker.publish() or publisher.publish() call, or an RPC broker.request().
Chain of Responsibility: In order to ensure that the message continues through the processing pipeline, your middleware must call the next component in the chain. This is typically done by calling the call_next() method with the message or command as an argument, or by using the super() function to call the implementation of the next method in the chain.
Context Access: All middleware methods have access to the FastStream context via self.context.
Broker-specific extensions: If the basic publish command does not meet your needs, you can use the extended option. Here is an example: Use typed publish commands (KafkaPublishCommand and RabbitPublishCommand) to access and manipulate broker-specific attributes when publishing messages.
To choose the right method for your needs, think about the stage you want to intervene in: on_receive for the initial message arrival, consume_scope to wrap the core processing logic, publish_scope for outgoing messages, and after_processed for post-processing and cleanup.