Skip to content

FastStream#

Effortless event stream integration for your services


ag2ai%2Ffaststream | Trendshift

Test Passing Coverage Downloads Package version Supported Python versions
CodeQL Dependency Review License Code of Conduct
Discord FastStream Telegram
Gurubase


Features#

FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.

  • Multiple Brokers: FastStream provides a suitable API to work across multiple message brokers (Kafka, RabbitMQ, NATS, Redis support)

  • Built-in Serialization: Leverage Pydantic or Msgspec validation capabilities to serialize and validate incoming messages

  • Automatic Docs: Stay ahead with automatic AsyncAPI documentation

  • Intuitive: Full-typed editor support makes your development experience smooth, catching errors before they reach runtime

  • Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system

  • Testable: Supports in-memory tests, making your CI/CD pipeline faster and more reliable

  • Extensible: Use extensions for lifespans, custom serialization and middleware

  • Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)

That's FastStream in a nutshell - easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.

Project History

FastStream is a package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created a unified way to write services capable of processing streamed data regardless of the underlying protocol.


Versioning Policy#

FastStream has a stable public API. Only major updates may introduce breaking changes.

Prior to FastStream's 1.0 release, each minor update may introduce breaking changes. To ensure a smooth transition, these changes are communicated through deprecation warnings before being fully implemented. Our team is working towards the stable 1.0 release.


Install#

FastStream works on Linux, macOS, Windows and most Unix-style operating systems. You can install it with pip as usual:

pip install 'faststream[kafka]'
pip install 'faststream[confluent]'
pip install 'faststream[rabbit]'
pip install 'faststream[nats]'
pip install 'faststream[redis]'

Writing app code#

FastStream brokers provide convenient function decorators @broker.subscriber(...) and @broker.publisher(...) to allow you to delegate the actual process of:

  • consuming and producing data to Event queues, and

  • decoding and encoding JSON-encoded messages

These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.

Here is an example Python app using FastStream that consumes data from an incoming data stream and outputs the data to another one:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("in-queue")
@broker.publisher("out-queue")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber("in-subject")
@broker.publisher("out-subject")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

@broker.subscriber("in-channel")
@broker.publisher("out-channel")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Pydantic serialization#

Also, Pydantic’s BaseModel class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.

from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"
from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"
from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in-queue")
@broker.publisher("out-queue")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"
from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in-subject")
@broker.publisher("out-subject")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"
from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in-channel")
@broker.publisher("out-channel")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"

By default we use PydanticV2 written in Rust as serialization library, but you can downgrade it manually, if your platform has no Rust support - FastStream will work correctly with PydanticV1 as well.

To choose the Pydantic version, you can install the required one using the regular

pip install pydantic==1.X.Y

FastStream (and FastDepends inside) should work correctly with almost any version.

Msgspec serialization#

Moreover, FastStream is not tied to any specific serialization library, so you can use any preferred one. Fortunately, we provide a built‑in alternative for the most popular Pydantic replacement - Msgspec.

1
2
3
4
from fast_depends.msgspec import MsgSpecSerializer
from faststream.kafka import KafkaBroker

broker = KafkaBroker(serializer=MsgSpecSerializer())
1
2
3
4
from fast_depends.msgspec import MsgSpecSerializer
from faststream.confluent import KafkaBroker

broker = KafkaBroker(serializer=MsgSpecSerializer())
1
2
3
4
from fast_depends.msgspec import MsgSpecSerializer
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(serializer=MsgSpecSerializer())
1
2
3
4
from fast_depends.msgspec import MsgSpecSerializer
from faststream.nats import NatsBroker

broker = NatsBroker(serializer=MsgSpecSerializer())
1
2
3
4
from fast_depends.msgspec import MsgSpecSerializer
from faststream.redis import RedisBroker

broker = RedisBroker(serializer=MsgSpecSerializer())

You can read more about the feature in the documentation.

Unified API#

At first glance, FastStream unifies various broker backends under a single API. However, a completely unified API inevitably results in missing features. We do not want to limit users' choices. If you prefer Kafka over Redis, there is a reason. Therefore, we support all native broker features you need.

Consequently, our unified API has a relatively limited scope:

from faststream.kafka import KafkaBroker, KafkaMessage

broker = KafkaBroker("localhost:9092")

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handler(msg: KafkaMessage) -> None:
    await msg.ack()  # control brokers' acknowledgement policy

...

await broker.publish("Message", "in-topic")
from faststream.confluent import KafkaBroker, KafkaMessage

broker = KafkaBroker("localhost:9092")

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handler(msg: KafkaMessage) -> None:
    await msg.ack()  # control brokers' acknowledgement policy

...

await broker.publish("Message", "in-topic")
from faststream.rabbit import RabbitBroker, RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.subscriber("in-queue")
@broker.publisher("out-queue")
async def handler(msg: RabbitMessage) -> None:
    await msg.ack()  # control brokers' acknowledgement policy

...

await broker.publish("Message", "in-queue")
from faststream.nats import NatsBroker, NatsMessage

broker = NatsBroker("nats://localhost:4222")

@broker.subscriber("in-subject")
@broker.publisher("out-subject")
async def handler(msg: NatsMessage) -> None:
    await msg.ack()  # control brokers' acknowledgement policy

...

await broker.publish("Message", "in-subject")
from faststream.redis import RedisBroker, RedisMessage

broker = RedisBroker("redis://localhost:6379")

@broker.subscriber("in-channel")
@broker.publisher("out-channel")
async def handler(msg: RedisMessage) -> None:
    await msg.ack()  # control brokers' acknowledgement policy

...

await broker.publish("Message", "in-channel")

Beyond this scope you can use any broker-native features you need:

  • Kafka - specific partition reads, partitioner control, consumer groups, batch processing, etc.
  • RabbitMQ - all exchange types, Redis Streams, RPC, manual channel configuration, DLQ, etc.
  • NATS - core and Push/Pull JetStream subscribers, KeyValue, ObjectStorage, RPC, etc.
  • Redis - Pub/Sub, List, Stream subscribers, consumer groups, acknowledgements, etc.

You can find detailed information about all supported features in FastStream’s broker‑specific documentation.

If a particular feature is missing or not yet supported, you can always fall back to the native broker client/connection for those operations.


Testing the service#

The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".

The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

import pytest
from pydantic import ValidationError
from faststream.kafka import TestKafkaBroker


@pytest.mark.asyncio
async def test_correct() -> None:
    async with TestKafkaBroker(broker) as br:
        await br.publish(
            {
                "user": "John",
                "user_id": 1,
            }, "in-topic",
        )

@pytest.mark.asyncio
async def test_invalid() -> None:
    async with TestKafkaBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish("wrong message", "in-topic")
import pytest
from pydantic import ValidationError
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio
async def test_correct() -> None:
    async with TestKafkaBroker(broker) as br:
        await br.publish(
            {
                "user": "John",
                "user_id": 1,
            }, "in-topic",
        )

@pytest.mark.asyncio
async def test_invalid() -> None:
    async with TestKafkaBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish("wrong message", "in-topic")
import pytest
from pydantic import ValidationError
from faststream.rabbit import TestRabbitBroker


@pytest.mark.asyncio
async def test_correct() -> None:
    async with TestRabbitBroker(broker) as br:
        await br.publish(
            {
                "user": "John",
                "user_id": 1,
            }, "in-queue",
        )

@pytest.mark.asyncio
async def test_invalid() -> None:
    async with TestRabbitBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish("wrong message", "in-queue")
import pytest
from pydantic import ValidationError
from faststream.nats import TestNatsBroker


@pytest.mark.asyncio
async def test_correct() -> None:
    async with TestNatsBroker(broker) as br:
        await br.publish(
            {
                "user": "John",
                "user_id": 1,
            }, "in-subject",
        )

@pytest.mark.asyncio
async def test_invalid() -> None:
    async with TestNatsBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish("wrong message", "in-subject")
import pytest
from pydantic import ValidationError
from faststream.redis import TestRedisBroker


@pytest.mark.asyncio
async def test_correct() -> None:
    async with TestRedisBroker(broker) as br:
        await br.publish(
            {
                "user": "John",
                "user_id": 1,
            }, "in-channel",
        )

@pytest.mark.asyncio
async def test_invalid() -> None:
    async with TestRedisBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish("wrong message", "in-channel")

Running the application#

The application can be started using the built-in FastStream CLI command.

Note

Before running the service, install FastStream CLI using the following command:

pip install "faststream[cli]"

To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.

faststream run basic:app

After running the command, you should see the following output:

INFO     - FastStream app starting...
INFO     - input_data |            - `HandleMsg` waiting for messages
INFO     - FastStream app started successfully! To exit press CTRL+C

Also, FastStream provides you with a great hot reload feature to improve your Development Experience

faststream run basic:app --reload

And multiprocessing horizontal scaling feature as well:

faststream run basic:app --workers 3

You can Learn more about CLI features here


Project Documentation#

FastStream automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a web view of your documentation on resources available to related teams.

The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message formats the application works with. And most importantly, it won't cost anything - FastStream has already created the docs for you!

HTML-page


Dependencies#

FastStream (thanks to FastDepends) has a dependency management system similar to pytest fixtures and FastAPI Depends at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.

from faststream import Depends, Logger
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

async def base_dep(user_id: int) -> bool:
    return True

@broker.subscriber("in-test")
async def base_handler(
    user: str,
    logger: Logger,
    dep: bool = Depends(base_dep),
):
    assert dep is True
    logger.info(user)
from typing import Annotated

from faststream import Depends, Logger
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

async def base_dep(user_id: int) -> bool:
    return True

@broker.subscriber("in-test")
async def base_handler(
    user: str,
    logger: Logger,
    dep: Annotated[bool, Depends(base_dep)],
):
    assert dep is True
    logger.info(user)

HTTP Frameworks integrations#

Any Framework#

You can use FastStream MQBrokers without a FastStream application. Just start and stop them according to your application's lifespan.

from litestar import Litestar, get
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.subscriber("queue")
async def handle(msg):
    print(msg)

@get("/")
async def index() -> str:
    return "Hello, world!"

app = Litestar(
    [index],
    on_startup=[broker.start],
    on_shutdown=[broker.stop],
)
from aiohttp import web

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")


@broker.subscriber("test")
async def base_handler(body):
    print(body)


async def start_broker(app):
    await broker.start()


async def stop_broker(app):
    await broker.stop()


async def hello(request):
    return web.Response(text="Hello, world")


app = web.Application()
app.add_routes([web.get("/", hello)])
app.on_startup.append(start_broker)
app.on_cleanup.append(stop_broker)


if __name__ == "__main__":
    web.run_app(app)
from blacksheep import Application

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

app = Application()


@broker.subscriber("test")
async def base_handler(body):
    print(body)


@app.on_start
async def start_broker(application: Application) -> None:
    await broker.start()


@app.on_stop
async def stop_broker(application: Application) -> None:
    await broker.stop()


@app.route("/")
async def home():
    return "Hello, World!"
import falcon
import falcon.asgi

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")


@broker.subscriber("test")
async def base_handler(body):
    print(body)


class ThingsResource:
    async def on_get(self, req, resp):
        resp.status = falcon.HTTP_200
        resp.content_type = falcon.MEDIA_TEXT
        resp.text = (
            "\nTwo things awe me most, the starry sky "
            "above me and the moral law within me.\n"
            "\n"
            "    ~ Immanuel Kant\n\n"
        )


class StreamMiddleware:
    async def process_startup(self, scope, event):
        await broker.start()

    async def process_shutdown(self, scope, event):
        await broker.stop()


app = falcon.asgi.App()
app.add_middleware(StreamMiddleware())
app.add_route("/things", ThingsResource())
from quart import Quart

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

app = Quart(__name__)


@broker.subscriber("test")
async def base_handler(body):
    print(body)


@app.before_serving
async def start_broker():
    await broker.start()


@app.after_serving
async def stop_broker():
    await broker.stop()


@app.route("/")
async def json():
    return {"hello": "world"}
from sanic import Sanic
from sanic.response import text

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

app = Sanic("MyHelloWorldApp")


@broker.subscriber("test")
async def base_handler(body):
    print(body)


@app.after_server_start
async def start_broker(app, loop):
    await broker.start()


@app.after_server_stop
async def stop_broker(app, loop):
    await broker.stop()


@app.get("/")
async def hello_world(request):
    return text("Hello, world.")

FastAPI Plugin#

Also, FastStream can be used as part of FastAPI.

Just import a StreamRouter you need and declare the message handler with the same @router.subscriber(...) and @router.publisher(...) decorators.

from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.kafka.fastapi import KafkaRouter, Logger

router = KafkaRouter("localhost:9092")

class Incoming(BaseModel):
    m: dict

def call() -> bool:
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(message: Incoming, logger: Logger, dependency: bool = Depends(call)):
    logger.info("Incoming value: %s, depends value: %s" % (message.m, dependency))
    return {"response": "Hello, Kafka!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI()
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.confluent.fastapi import KafkaRouter, Logger

router = KafkaRouter("localhost:9092")

class Incoming(BaseModel):
    m: dict

def call() -> bool:
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(message: Incoming, logger: Logger, dependency: bool = Depends(call)):
    logger.info("Incoming value: %s, depends value: %s" % (message.m, dependency))
    return {"response": "Hello, Kafka!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI()
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.rabbit.fastapi import RabbitRouter, Logger

router = RabbitRouter("amqp://guest:guest@localhost:5672/")

class Incoming(BaseModel):
    m: dict

def call() -> bool:
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(message: Incoming, logger: Logger, dependency: bool = Depends(call)):
    logger.info("Incoming value: %s, depends value: %s" % (message.m, dependency))
    return {"response": "Hello, Rabbit!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI()
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.nats.fastapi import NatsRouter, Logger

router = NatsRouter("nats://localhost:4222")

class Incoming(BaseModel):
    m: dict

def call() -> bool:
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(message: Incoming, logger: Logger, dependency: bool = Depends(call)):
    logger.info("Incoming value: %s, depends value: %s" % (message.m, dependency))
    return {"response": "Hello, NATS!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI()
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter("redis://localhost:6379")

class Incoming(BaseModel):
    m: dict

def call() -> bool:
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(message: Incoming, logger: Logger, dependency: bool = Depends(call)):
    logger.info("Incoming value: %s, depends value: %s" % (message.m, dependency))
    return {"response": "Hello, Redis!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI()
app.include_router(router)

Note

More integration features can be found here


Stay in touch#

Please show your support and stay in touch by:

Your support helps us to stay in touch with you and encourages us to continue developing and improving the framework. Thank you for your support!


Contributors#

Thanks to all of these amazing people who made the project better!