Skip to content

ZmqttProducerV5

faststream.mqtt.publisher.producer.ZmqttProducerV5 #

ZmqttProducerV5(
    parser: Optional[CustomCallable],
    decoder: Optional[CustomCallable],
)

Bases: ZmqttBaseProducer

Producer for MQTT 5.0 — publishes with PublishProperties.

Source code in faststream/mqtt/publisher/producer.py
def __init__(
    self,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    super().__init__(MQTTParserV5(), parser, decoder)

serializer instance-attribute #

serializer: SerializerProto | None = None

publish async #

publish(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish(self, cmd: "MQTTPublishCommand") -> None:
    payload, content_type = encode_message(cmd.body, self.serializer)

    user_props: list[tuple[str, str]] = [
        (k, str(v)) for k, v in (cmd.headers or {}).items()
    ]

    properties = PublishProperties(
        content_type=content_type or None,
        response_topic=cmd.reply_to or None,
        correlation_data=cmd.correlation_id.encode() if cmd.correlation_id else None,
        user_properties=tuple(user_props),
        message_expiry_interval=cmd.message_expiry_interval,
    )

    await self._connected_client.publish(
        cmd.destination,
        payload,
        qos=cmd.qos,
        retain=cmd.retain,
        properties=properties,
    )

request async #

request(cmd: MQTTPublishCommand) -> Message

Request/reply for MQTT 5.0 via zmqtt's native client.request().

zmqtt auto-generates a unique reply topic. We pass our correlation ID explicitly so the responder echoes it back and the caller can verify it on the response StreamMessage.

Source code in faststream/mqtt/publisher/producer.py
@override
async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message":
    """Request/reply for MQTT 5.0 via zmqtt's native client.request().

    zmqtt auto-generates a unique reply topic.  We pass our correlation
    ID explicitly so the responder echoes it back and the caller can
    verify it on the response StreamMessage.
    """
    payload, content_type = encode_message(cmd.body, self.serializer)
    correlation_id = cmd.correlation_id or gen_cor_id()

    user_props: list[tuple[str, str]] = [
        (k, str(v)) for k, v in (cmd.headers or {}).items()
    ]

    # Pass correlation_data explicitly so the responder echoes it back.
    # Do NOT set response_topic — let zmqtt generate it.
    properties = PublishProperties(
        content_type=content_type or None,
        correlation_data=correlation_id.encode(),
        user_properties=tuple(user_props),
        message_expiry_interval=cmd.message_expiry_interval,
    )

    return await self._connected_client.request(
        cmd.destination,
        payload,
        qos=cmd.qos,
        timeout=cmd.timeout or 30.0,
        properties=properties,
    )

publish_batch async #

publish_batch(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
    msg = "MQTT does not support batch publishing."
    raise FeatureNotSupportedException(msg)

connect #

connect(
    client: MQTTClient,
    serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
def connect(
    self,
    client: "zmqtt.MQTTClient",
    serializer: Optional["SerializerProto"],
) -> None:
    self._client = client
    self.serializer = serializer

disconnect #

disconnect() -> None
Source code in faststream/mqtt/publisher/producer.py
def disconnect(self) -> None:
    self._client = None
    self.serializer = None