Skip to content

MQTTParserV5

faststream.mqtt.parser.MQTTParserV5 #

Bases: MQTTBaseParser

Parser for MQTT 5.0 messages.

Extracts content_type, response_topic, correlation_data, and user_properties from PUBLISH properties when available.

parse_message async #

parse_message(msg: Message) -> MQTTMessage
Source code in faststream/mqtt/parser.py
async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage:
    props = msg.properties
    content_type: str | None = None
    reply_to: str = ""
    correlation_id: str | None = None
    headers: dict[str, Any] = {}

    if props is not None:
        content_type = props.content_type
        reply_to = props.response_topic or ""
        if props.correlation_data is not None:
            correlation_id = props.correlation_data.decode(errors="replace")
        headers.update(props.user_properties)

    return MQTTMessage(
        raw_message=msg,
        body=msg.payload,
        headers=headers,
        content_type=content_type,
        reply_to=reply_to,
        correlation_id=correlation_id,
    )

decode_message async #

decode_message(msg: StreamMessage[Any]) -> DecodedMessage
Source code in faststream/mqtt/parser.py
async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
    return decode_message(msg)