Request / response over MQTT#
FastStream mirrors the RabbitMQ RPC style: await broker.request(...) blocks until a reply arrives, and subscribers that return a value publish the response to the caller’s reply address.
MQTT 5.0 (recommended)#
Set version="5.0". The producer uses zmqtt’s request(), which allocates a private response topic, subscribes, publishes with Correlation Data, and waits for one matching PUBLISH.
- You usually do not pass
reply_to— it is generated for you. correlation_idandheadersare supported on the request and are carried inPublishProperties.
The handler side receives MQTTMessage.reply_to and correlation_id populated from Response Topic and Correlation Data. When your handler returns a value (or a Response object), FastStream publishes the reply to that response topic with the same correlation.
from faststream import FastStream
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS
broker = MQTTBroker("localhost", version="5.0")
app = FastStream(broker)
@broker.subscriber("rpc/echo", qos=QoS.AT_LEAST_ONCE)
async def echo(body: str) -> str:
return body
@app.after_startup
async def demo() -> None:
reply: MQTTMessage = await broker.request("hello", "rpc/echo", timeout=5.0)
assert reply.body # decoded payload
MQTTResponse#
For MQTT-specific reply options, return MQTTResponse from faststream.mqtt.response to set qos and retain on the outgoing reply (in addition to body, headers, and correlation_id).
MQTT 3.1.1#
There are no Response Topic or Correlation Data properties. FastStream instead requires a stable reply topic known to both sides:
- Pass
reply_to="my/shared/reply/topic"tobroker.request(...). - The client subscribes to that topic, publishes the request, and waits for the first message on the reply topic.
- Your service must publish the response to
reply_to(for example with@broker.publisheror explicitpublish).
If reply_to is omitted, request() raises FeatureNotSupportedException.
Disabling automatic replies#
Set @broker.subscriber(..., no_reply=True) if the handler should not send an RPC reply (same as RabbitMQ).