Redis Pipeline#
FastStream supports Redis pipelining to optimize performance when publishing multiple messages in a batch. This allows you to queue several Redis operations and execute them in one network round-trip, reducing latency significantly.
Usage Example#
API#
You can pass the pipeline parameter to the publish method to delay the execution of Redis commands. The commands will only be executed after you explicitly call await pipe.execute().
The pipeline object is injected by the Pipeline annotation:
Pipeline is a Redis pipeline object (redis.asyncio.client.Pipeline), which is wrapped in a FastStream dependency and will be automatically available in any subscriber.
Batch Publishing with Pipeline#
When using broker.publish_batch() in combination with the pipeline parameter, all messages sent through the pipeline are queued and processed by the subscriber as a single batch after calling await pipe.execute(). This allows the subscriber to handle all messages sent through the pipeline in a single execution, improving the efficiency of batch processing.
Notes#
- Pipelining is supported for all Redis queue types, including channels, lists, and streams.
- You can combine multiple queue types in a single pipeline.
Benefits#
- Reduces network traffic by batching Redis commands.
- Improves performance in high-volume scenarios.
- Fully integrates with FastStream's dependency injection system.
- Allows for efficient batch processing when using
broker.publish_batch()andpipeline, as all messages are processed as a single entity by the subscriber afterawait pipe.execute().