Skip to content

Dynamic Subscribers#

Sometimes, you need to process messages as they arrive. You may not know the source of the messages at startup. They could be sent to the service later: via an incoming message, request, or even generated randomly as a temporary queue for processing the response. In these cases, you cannot use the regular FastStream's @broker.subscriber() decorators.

However, the framework still allows you to do so in a suitable manner.

Warning

Dynamic subscribers are not supported by TestBroker.

The examples below will not work.

1
2
3
4
5
6
7
8
broker = KafkaBroker()

async with TestKafkaBroker(broker) as br:
    subscriber = br.subscriber("test-topic", persistent=False)

    await subscriber.start()
    message = await subscriber.get_one()  # does not work
    await subscriber.stop()
1
2
3
4
5
6
7
8
broker = KafkaBroker()

async with TestKafkaBroker(broker) as br:
    subscriber = br.subscriber("test-topic", persistent=False)

    await subscriber.start()
    message = await subscriber.get_one()  # does not work
    await subscriber.stop()
1
2
3
4
5
6
7
8
broker = RabbitBroker()

async with TestRabbitBroker(broker) as br:
    subscriber = br.subscriber("test-queue", persistent=False)

    await subscriber.start()
    message = await subscriber.get_one()  # does not work
    await subscriber.stop()
1
2
3
4
5
6
7
8
broker = NatsBroker()

async with TestNatsBroker(broker) as br:
    subscriber = br.subscriber("test-subject", persistent=False)

    await subscriber.start()
    message = await subscriber.get_one()  # does not work
    await subscriber.stop()
1
2
3
4
5
6
7
8
broker = RedisBroker()

async with TestRedisBroker(broker) as br:
    subscriber = br.subscriber("test-channel", persistent=False)

    await subscriber.start()
    message = await subscriber.get_one()  # does not work
    await subscriber.stop()

Consuming a Single Message#

To process a single message, you should create a subscriber and call the appropriate method on it. Don't forget to start the subscriber.

from faststream.kafka import KafkaBroker, KafkaMessage

async def main():
    async with KafkaBroker() as broker:  # connect the broker
        subscriber = broker.subscriber("test-topic", persistent=False)
        await subscriber.start()

        message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Important

Do not forget to start and stop subscriber manually

1
2
3
4
5
        await subscriber.start()

        message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()
from faststream.confluent import KafkaBroker, KafkaMessage

async def main():
    async with KafkaBroker() as broker:  # connect the broker
        subscriber = broker.subscriber("test-topic", persistent=False)
        await subscriber.start()

        message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Important

Do not forget to start and stop subscriber manually

1
2
3
4
5
        await subscriber.start()

        message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()
from faststream.rabbit import RabbitBroker, RabbitMessage

async def main():
    async with RabbitBroker() as broker:  # connect the broker
        subscriber = broker.subscriber("test-queue", persistent=False)
        await subscriber.start()

        message: RabbitMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Important

Do not forget to start and stop subscriber manually

1
2
3
4
5
        await subscriber.start()

        message: RabbitMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()
from faststream.nats import NatsBroker, NatsMessage

async def main():
    async with NatsBroker() as broker:  # connect the broker
        subscriber = broker.subscriber("test-subject", persistent=False)
        await subscriber.start()

        message: NatsMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Important

Do not forget to start and stop subscriber manually

1
2
3
4
5
        await subscriber.start()

        message: NatsMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()
from faststream.redis import RedisBroker, RedisChannelMessage

async def main():
    async with RedisBroker() as broker:  # connect the broker
        subscriber = broker.subscriber("test-channel", persistent=False)
        await subscriber.start()

        message: RedisChannelMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Important

Do not forget to start and stop subscriber manually

1
2
3
4
5
        await subscriber.start()

        message: RedisChannelMessage | None = await subscriber.get_one(timeout=3.0)

        await subscriber.stop()

Iteration over messages#

However, if you want to process a stream of messages in a dynamic way, you should not use such unattractive methods as this one:

ugly_example.py
1
2
3
4
while True:
    msg = await subscriber.get_one(timeout=3.0)
    if msg:
        ... # do message process

It would be much better to use the built-in iteration mechanism:

from faststream.kafka import KafkaBroker, KafkaMessage

async def main():
    async with KafkaBroker() as broker:
        subscriber = broker.subscriber("test-topic", persistent=False)
        await subscriber.start()

        async for msg in subscriber: # msg is KafkaMessage type
            ... # do message process

        await subscriber.stop()
from faststream.confluent import KafkaBroker, KafkaMessage

async def main():
    async with KafkaBroker() as broker:
        subscriber = broker.subscriber("test-topic", persistent=False)
        await subscriber.start()

        async for msg in subscriber: # msg is KafkaMessage type
            ... # do message process

        await subscriber.stop()
from faststream.rabbit import RabbitBroker, RabbitMessage

async def main():
    async with RabbitBroker() as broker:
        subscriber = broker.subscriber("test-queue", persistent=False)
        await subscriber.start()

        async for msg in subscriber: # msg is RabbitMessage type
            ... # do message process

        await subscriber.stop()
from faststream.nats import NatsBroker, NatsMessage

async def main():
    async with NatsBroker() as broker:
        subscriber = broker.subscriber("test-subject", persistent=False)
        await subscriber.start()

        async for msg in subscriber: # msg is NatsMessage type
            ... # do message process

        await subscriber.stop()
from faststream.redis import RedisBroker, RedisMessage

async def main():
    async with RedisBroker() as broker:
        subscriber = broker.subscriber("test-channel", persistent=False)
        await subscriber.start()

        async for msg in subscriber: # msg is RedisMessage type
            ... # do message process

        await subscriber.stop()

Technical Details

Both ways support all FastStream features, such as middlewares, OpenTelemetry tracing and Prometheus metrics.

Acknowledgement#

Note that the default FastStream acknowledgement logic does not work here. You will need to acknowledge a consumed message manually.

msg = await subscriber.get_one()
await msg.ack()

And

async for msg in subscriber:
    await msg.ack()