Skip to content

Multiple Brokers#

Usually a FastStream application is built around a single broker, but sometimes one process needs to talk to more than one messaging system at the same time. A common case is bridging two systems: consume from one broker and re-publish to another. Another is a gradual migration from one broker to another, where both must run side by side for a while.

To support these scenarios, the FastStream application accepts multiple brokers at once.

Passing Brokers to the Application#

Just pass all the brokers you want to run to the FastStream constructor. Each broker keeps its own subscribers and publishers, and the application starts and stops all of them together.

In the example below we consume messages from Kafka and bridge them into NATS:

from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.nats import NatsBroker

kafka_broker = KafkaBroker("localhost:9092")
nats_broker = NatsBroker("nats://localhost:4222")

app = FastStream(kafka_broker, nats_broker)


@kafka_broker.subscriber("incoming")
@nats_broker.publisher("outgoing")
async def from_kafka(msg: str) -> str:
    # Bridge the message from Kafka to NATS
    return msg


@nats_broker.subscriber("outgoing")
async def from_nats(msg: str) -> None:
    print(f"Received from NATS: {msg}")

When the application runs, both brokers connect. The from_kafka handler is registered on the kafka_broker, while the @nats_broker.publisher decorator routes its return value to the nats_broker — so a single message coming from Kafka ends up being delivered to a NATS subscriber.

Each broker is an independent object: you attach subscribers and publishers to the exact broker you mean, which makes routing between systems explicit.

Adding Brokers Dynamically#

If you don't have all brokers available at construction time, you can register them later with the add_broker method. It is equivalent to passing the broker to the constructor:

1
2
3
4
5
6
7
8
9
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.nats import NatsBroker

kafka_broker = KafkaBroker("localhost:9092")
nats_broker = NatsBroker("nats://localhost:4222")

app = FastStream(kafka_broker)
app.add_broker(nats_broker)

Note

The first broker you pass (or add) becomes the application's default broker, available as app.broker. All registered brokers are available as the app.brokers list.

Testing#

Each broker is tested in isolation with its own TestBroker. Wrap every broker your application uses in the matching test context manager, and the in-memory patches apply across all of them — so the bridge keeps working end to end:

import pytest

from faststream.kafka import TestKafkaBroker
from faststream.nats import TestNatsBroker

from .app import from_kafka, from_nats, kafka_broker, nats_broker


@pytest.mark.asyncio()
async def test_bridge() -> None:
    async with (
        TestKafkaBroker(kafka_broker) as br,
        TestNatsBroker(nats_broker),
    ):
        await br.publish("Hi!", "incoming")

        from_kafka.mock.assert_called_once_with("Hi!")
        from_nats.mock.assert_called_once_with("Hi!")

Publishing a message to the Kafka subscriber triggers the bridge, and the message reaches the NATS handler — all without any running broker.

Same-Type Brokers with a Shared TestBroker#

When your application runs several brokers of the same type (for example, two Kafka clusters), you don't need a separate context manager for each one. Pass them all to a single TestBroker instead — it patches every broker you give it and routes messages between them in memory:

import pytest

from faststream.kafka import TestKafkaBroker

from .same_type_app import broker_1, broker_2, from_first, from_second


@pytest.mark.asyncio()
async def test_bridge() -> None:
    async with TestKafkaBroker(broker_1, broker_2) as (br1, _):
        await br1.publish("Hi!", "incoming")

        from_first.mock.assert_called_once_with("Hi!")
        from_second.mock.assert_called_once_with("Hi!")

This is the recommended way to test same-type setups: one TestKafkaBroker(broker_1, broker_2) keeps the brokers wired together, so a message bridged from the first cluster is delivered to a subscriber on the second.

Note

Mix both styles freely — use a shared TestBroker per broker type, and nest those context managers when your application combines different types (e.g. TestKafkaBroker(...) together with TestNatsBroker(...)).