Message information and serialization#
FastStream wraps each incoming zmqtt.Message in MQTTMessage, which extends the generic StreamMessage.
Fields on MQTTMessage#
Typical fields used in handlers:
| Field | Meaning |
|---|---|
body | Decoded payload (bytes or deserialized JSON / text depending on content_type and decoder). |
headers | MQTT 5.0 User Properties as dict[str, str]. Empty for 3.1.1. |
content_type | From MQTT 5.0 Content Type property, if present. |
reply_to | Response Topic (MQTT 5.0), used for RPC replies. |
correlation_id | Correlation Data decoded as text (MQTT 5.0). |
raw_message | The original zmqtt.Message (topic, QoS, retain, properties). |
Access via a parameter, Context, or Annotated shortcuts, same as other brokers.
from faststream.mqtt.annotations import MQTTMessage
@broker.subscriber("devices/+/status")
async def handle(msg: MQTTMessage):
props = msg.raw_message.properties # MQTT 5.0 only
...
Serialization pipeline#
Serialization follows the global FastStream rules (custom serialization):
- Encoding (publish) —
encode_messageturns Python values intobytesand may set a logical content type (application/json,text/plain, etc.). - MQTT 5.0 — that content type is written to
PublishProperties.content_type, and the payload is sent as raw bytes. - Decoding (consume) —
MQTTParserV5.decode_messageusescontent_typeand the body to decode JSON or text; MQTT 3.1.1 uses heuristics on the raw payload (JSON first, then UTF-8 text, elsebytes).
User Properties as application headers#
For MQTT 5.0, headers in FastStream are User Properties on the PUBLISH packet. They are string key/value pairs only—if you need binary metadata, encode it (for example Base64) or use the payload.
Custom or framework-specific metadata should use headers (User Properties). Protocol-level fields such as Response Topic, Correlation Data, and Content Type are exposed as dedicated attributes on MQTTMessage, not duplicated inside headers.
Advanced: direct property access#
Anything not exposed on MQTTMessage can still be read from msg.raw_message.properties (a PublishProperties instance) on MQTT 5.0, for example message expiry interval, topic alias, or additional spec fields supported by zmqtt.