Skip to content

Publishing over MQTT#

FastStream uses the same publishing patterns as in the getting started guide: await broker.publish(...), @broker.publisher(...), and publisher objects.

MQTTBroker.publish#

Key arguments:

Argument Description
message Body (SendableMessage): primitives, models, bytes, etc.
topic Target topic (must not contain + or #).
qos QoS.AT_MOST_ONCE (0), AT_LEAST_ONCE (1), or EXACTLY_ONCE (2).
retain If true, the broker stores the last message for new subscribers.
headers MQTT 5.0 only — maps to User Properties.
correlation_id MQTT 5.0 only — Correlation Data (for tracing or paired replies).
reply_to MQTT 5.0 only — Response Topic for request/reply style flows.

MQTT 3.1.1 rejects headers, correlation_id, and reply_to; use MQTT 5.0 for metadata on the wire.

from faststream import FastStream
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS

broker = MQTTBroker("localhost", version="5.0")
app = FastStream(broker)


@broker.subscriber("devices/alerts")
async def handle_alert(payload: dict, msg: MQTTMessage) -> None:
    print(payload, msg.headers)


@app.after_startup
async def send_alert() -> None:
    await broker.publish(
        {"level": "warning"},
        "devices/alerts",
        qos=QoS.AT_LEAST_ONCE,
        retain=True,
        headers={"source": "docs"},
        correlation_id="alert-1",
    )

Publisher objects#

broker.publisher("topic", qos=..., retain=..., headers=...) returns an MQTTPublisher with the same semantics. Per-call publish() can override qos / retain / headers where applicable.

from faststream import FastStream
from faststream.mqtt import MQTTBroker, QoS

broker = MQTTBroker("localhost", version="5.0")
app = FastStream(broker)

events = broker.publisher(
    "devices/events",
    qos=QoS.AT_LEAST_ONCE,
    headers={"source": "sensor"},
)


@broker.subscriber("devices/commands")
async def handle_command(command: str) -> None:
    await events.publish({"command": command}, headers={"kind": "echo"})


@broker.subscriber("devices/events")
async def handle_event(event: dict) -> None:
    print(event)

Publisher decorator#

Decorated subscriber return values are published to the configured topic, the same as other FastStream brokers.

from faststream import FastStream
from faststream.mqtt import MQTTBroker

broker = MQTTBroker("localhost", version="5.0")
app = FastStream(broker)


@broker.publisher("processed")
@broker.subscriber("raw")
async def normalize(body: str) -> str:
    return body.upper()


@broker.subscriber("processed")
async def consume_processed(body: str) -> None:
    print(body)

Batching#

MQTT has no batch publish in FastStream — calling batch APIs raises FeatureNotSupportedException.