Skip to content

ZmqttProducerV311

faststream.mqtt.publisher.producer.ZmqttProducerV311 #

ZmqttProducerV311(
    parser: Optional[CustomCallable],
    decoder: Optional[CustomCallable],
)

Bases: ZmqttBaseProducer

Producer for MQTT 3.1.1 — publishes raw bytes only.

Headers, correlation_id, and other metadata are not supported. Use MQTT 5.0 for those features. Request/reply is supported via an explicit reply_to topic provided by the caller.

Source code in faststream/mqtt/publisher/producer.py
def __init__(
    self,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    super().__init__(MQTTParserV311(), parser, decoder)

serializer instance-attribute #

serializer: SerializerProto | None = None

publish async #

publish(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish(self, cmd: "MQTTPublishCommand") -> None:
    if cmd.headers:
        msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0."
        raise FeatureNotSupportedException(msg)
    payload, _ = encode_message(cmd.body, self.serializer)
    await self._connected_client.publish(
        cmd.destination,
        payload,
        qos=zmqtt.QoS(cmd.qos),
        retain=cmd.retain,
    )

request async #

request(cmd: MQTTPublishCommand) -> Message

Request/reply for MQTT 3.1.1 via explicit reply topic.

The caller must supply cmd.reply_to. FastStream subscribes to that topic, publishes the raw request payload, then waits for the first message on the reply topic. The handler side must publish its response to the same topic (e.g. via @broker.publisher).

Source code in faststream/mqtt/publisher/producer.py
@override
async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message":
    """Request/reply for MQTT 3.1.1 via explicit reply topic.

    The caller must supply ``cmd.reply_to``.  FastStream subscribes to
    that topic, publishes the raw request payload, then waits for the
    first message on the reply topic.  The handler side must publish
    its response to the same topic (e.g. via ``@broker.publisher``).
    """
    if not cmd.reply_to:
        msg = "MQTT 3.1.1 request() requires an explicit reply_to topic."
        raise FeatureNotSupportedException(msg)

    sub = self._connected_client.subscribe(cmd.reply_to)
    await sub.start()

    try:
        payload, _ = encode_message(cmd.body, self.serializer)
        await self._connected_client.publish(
            cmd.destination,
            payload,
            qos=cmd.qos,
            retain=cmd.retain,
        )
        return await asyncio.wait_for(
            sub.get_message(),
            timeout=cmd.timeout or 30.0,
        )
    finally:
        await sub.stop()

publish_batch async #

publish_batch(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
    msg = "MQTT does not support batch publishing."
    raise FeatureNotSupportedException(msg)

connect #

connect(
    client: MQTTClient,
    serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
def connect(
    self,
    client: "zmqtt.MQTTClient",
    serializer: Optional["SerializerProto"],
) -> None:
    self._client = client
    self.serializer = serializer

disconnect #

disconnect() -> None
Source code in faststream/mqtt/publisher/producer.py
def disconnect(self) -> None:
    self._client = None
    self.serializer = None