Sometimes, you need to process messages as they arrive. You may not know the source of the messages at startup. They could be sent to the service later: via an incoming message, request, or even generated randomly as a temporary queue for processing the response.
In these cases, you cannot use the regular FastStream's@broker.subscriber() decorators.
However, the framework still allows you to do so in a suitable manner.
Warning
Dynamic subscribers are not supported by TestBroker.
broker=KafkaBroker()asyncwithTestKafkaBroker(broker)asbr:subscriber=br.subscriber("test-topic",persistent=False)awaitsubscriber.start()message=awaitsubscriber.get_one()# does not workawaitsubscriber.stop()
broker=KafkaBroker()asyncwithTestKafkaBroker(broker)asbr:subscriber=br.subscriber("test-topic",persistent=False)awaitsubscriber.start()message=awaitsubscriber.get_one()# does not workawaitsubscriber.stop()
broker=RabbitBroker()asyncwithTestRabbitBroker(broker)asbr:subscriber=br.subscriber("test-queue",persistent=False)awaitsubscriber.start()message=awaitsubscriber.get_one()# does not workawaitsubscriber.stop()
broker=NatsBroker()asyncwithTestNatsBroker(broker)asbr:subscriber=br.subscriber("test-subject",persistent=False)awaitsubscriber.start()message=awaitsubscriber.get_one()# does not workawaitsubscriber.stop()
broker=RedisBroker()asyncwithTestRedisBroker(broker)asbr:subscriber=br.subscriber("test-channel",persistent=False)awaitsubscriber.start()message=awaitsubscriber.get_one()# does not workawaitsubscriber.stop()
fromfaststream.kafkaimportKafkaBroker,KafkaMessageasyncdefmain():asyncwithKafkaBroker()asbroker:# connect the brokersubscriber=broker.subscriber("test-topic",persistent=False)awaitsubscriber.start()message:KafkaMessage|None=awaitsubscriber.get_one(timeout=3.0)awaitsubscriber.stop()
Important
Do not forget to start and stop subscriber manually
fromfaststream.confluentimportKafkaBroker,KafkaMessageasyncdefmain():asyncwithKafkaBroker()asbroker:# connect the brokersubscriber=broker.subscriber("test-topic",persistent=False)awaitsubscriber.start()message:KafkaMessage|None=awaitsubscriber.get_one(timeout=3.0)awaitsubscriber.stop()
Important
Do not forget to start and stop subscriber manually
fromfaststream.rabbitimportRabbitBroker,RabbitMessageasyncdefmain():asyncwithRabbitBroker()asbroker:# connect the brokersubscriber=broker.subscriber("test-queue",persistent=False)awaitsubscriber.start()message:RabbitMessage|None=awaitsubscriber.get_one(timeout=3.0)awaitsubscriber.stop()
Important
Do not forget to start and stop subscriber manually
fromfaststream.natsimportNatsBroker,NatsMessageasyncdefmain():asyncwithNatsBroker()asbroker:# connect the brokersubscriber=broker.subscriber("test-subject",persistent=False)awaitsubscriber.start()message:NatsMessage|None=awaitsubscriber.get_one(timeout=3.0)awaitsubscriber.stop()
Important
Do not forget to start and stop subscriber manually
fromfaststream.redisimportRedisBroker,RedisChannelMessageasyncdefmain():asyncwithRedisBroker()asbroker:# connect the brokersubscriber=broker.subscriber("test-channel",persistent=False)awaitsubscriber.start()message:RedisChannelMessage|None=awaitsubscriber.get_one(timeout=3.0)awaitsubscriber.stop()
Important
Do not forget to start and stop subscriber manually
fromfaststream.kafkaimportKafkaBroker,KafkaMessageasyncdefmain():asyncwithKafkaBroker()asbroker:subscriber=broker.subscriber("test-topic",persistent=False)awaitsubscriber.start()asyncformsginsubscriber:# msg is KafkaMessage type...# do message processawaitsubscriber.stop()
fromfaststream.confluentimportKafkaBroker,KafkaMessageasyncdefmain():asyncwithKafkaBroker()asbroker:subscriber=broker.subscriber("test-topic",persistent=False)awaitsubscriber.start()asyncformsginsubscriber:# msg is KafkaMessage type...# do message processawaitsubscriber.stop()
fromfaststream.rabbitimportRabbitBroker,RabbitMessageasyncdefmain():asyncwithRabbitBroker()asbroker:subscriber=broker.subscriber("test-queue",persistent=False)awaitsubscriber.start()asyncformsginsubscriber:# msg is RabbitMessage type...# do message processawaitsubscriber.stop()
fromfaststream.natsimportNatsBroker,NatsMessageasyncdefmain():asyncwithNatsBroker()asbroker:subscriber=broker.subscriber("test-subject",persistent=False)awaitsubscriber.start()asyncformsginsubscriber:# msg is NatsMessage type...# do message processawaitsubscriber.stop()
fromfaststream.redisimportRedisBroker,RedisMessageasyncdefmain():asyncwithRedisBroker()asbroker:subscriber=broker.subscriber("test-channel",persistent=False)awaitsubscriber.start()asyncformsginsubscriber:# msg is RedisMessage type...# do message processawaitsubscriber.stop()