Skip to content

Message information and serialization#

FastStream wraps each incoming zmqtt.Message in MQTTMessage, which extends the generic StreamMessage.

Fields on MQTTMessage#

Typical fields used in handlers:

Field Meaning
body Decoded payload (bytes or deserialized JSON / text depending on content_type and decoder).
headers MQTT 5.0 User Properties as dict[str, str]. Empty for 3.1.1.
content_type From MQTT 5.0 Content Type property, if present.
reply_to Response Topic (MQTT 5.0), used for RPC replies.
correlation_id Correlation Data decoded as text (MQTT 5.0).
raw_message The original zmqtt.Message (topic, QoS, retain, properties).

Access via a parameter, Context, or Annotated shortcuts, same as other brokers.

1
2
3
4
@broker.subscriber("devices/+/status")
async def handle(msg: MQTTMessage) -> None:
    props = msg.raw_message.properties  # MQTT 5.0 only
    print(props)

Topic Path Access#

MQTT topic filters support + (single level) and # (multi level) wildcards. FastStream lets you capture single-level matches by naming them in the subscriber topic template and reading them back via Path (a shortcut for Context("message.path.*")).

Syntax Replaces Captures Placement constraint
"{name}", f"{{name}}" + One topic level as str Must occupy a whole topic level (surrounded by / or string boundaries).
"{{name}}", f"{{{{name}}}}" {name} No capture Allows braces to be treated as literal characters.

Single-level capture#

1
2
3
4
5
6
7
from faststream import FastStream, Path
@broker.subscriber("/devices/{device_id}/temperature")
async def on_temperature(
    body: str,
    device_id: str = Path(),
) -> None:
    print(device_id, body)

Literal braces#

MQTT topics may contain { and } as regular characters. Escape them by doubling braces so FastStream does not treat them as path parameters:

1
2
3
@broker.subscriber("/root/{{braced}}")
async def handle(body: str) -> None:
    print(body)

For f-strings, double the escaping because Python consumes one brace level first:

1
2
3
4
5
prefix = "/root"


@broker.subscriber(f"{prefix}/{{{{braced}}}}/status")
async def handle_status(body: str) -> None:

Both examples subscribe to the literal MQTT topic /root/{braced}.

Multi-level topics#

# subscriptions are supported as raw MQTT topic filters, but they are not captured through Path. Use MQTTMessage.raw_message.topic when you need the full topic.

1
2
3
4
5
6
7
from faststream.mqtt.annotations import MQTTMessage

@broker.subscriber("/devices/+/logs/#")
async def on_logs(
    msg: MQTTMessage,
) -> None:
    print(msg.raw_message.topic)

Validation#

Templates that violate MQTT topic rules are rejected at subscriber creation with SetupError:

  • "/pre{name}/x" or "/{name}post/x"{name} does not occupy a whole topic level.
  • "/{id}/x/{id}" — duplicated parameter name.

Raw MQTT + and # wildcards may be used alongside captured {name} levels. Only named single-level parameters are captured; use MQTTMessage.raw_message.topic for full-topic access when # is involved.

Serialization pipeline#

Serialization follows the global FastStream rules (custom serialization):

  1. Encoding (publish)encode_message turns Python values into bytes and may set a logical content type (application/json, text/plain, etc.).
  2. MQTT 5.0 — that content type is written to PublishProperties.content_type, and the payload is sent as raw bytes.
  3. Decoding (consume)MQTTParserV5.decode_message uses content_type and the body to decode JSON or text; MQTT 3.1.1 uses heuristics on the raw payload (JSON first, then UTF-8 text, else bytes).

User Properties as application headers#

For MQTT 5.0, headers in FastStream are User Properties on the PUBLISH packet. They are string key/value pairs only—if you need binary metadata, encode it (for example Base64) or use the payload.

Custom or framework-specific metadata should use headers (User Properties). Protocol-level fields such as Response Topic, Correlation Data, and Content Type are exposed as dedicated attributes on MQTTMessage, not duplicated inside headers.

Advanced: direct property access#

Anything not exposed on MQTTMessage can still be read from msg.raw_message.properties (a PublishProperties instance) on MQTT 5.0, for example message expiry interval, topic alias, or additional spec fields supported by zmqtt.