Skip to content

build_message

faststream.mqtt.testing.build_message #

build_message(
    message: SendableMessage,
    topic: str,
    *,
    version: Literal["3.1.1", "5.0"] = "5.0",
    qos: int = 0,
    retain: bool = False,
    reply_to: str = "",
    correlation_id: str | None = None,
    headers: dict[str, str] | None = None,
    serializer: Optional[SerializerProto] = None,
) -> Message

Build a fake zmqtt.Message from publish parameters.

For MQTT 5.0 uses PublishProperties to carry metadata so that MQTTParserV5 can extract them transparently. For MQTT 3.1.1 returns a plain message with raw payload only.

Source code in faststream/mqtt/testing.py
def build_message(
    message: "SendableMessage",
    topic: str,
    *,
    version: Literal["3.1.1", "5.0"] = "5.0",
    qos: int = 0,
    retain: bool = False,
    reply_to: str = "",
    correlation_id: str | None = None,
    headers: dict[str, str] | None = None,
    serializer: Optional["SerializerProto"] = None,
) -> zmqtt.Message:
    """Build a fake ``zmqtt.Message`` from publish parameters.

    For MQTT 5.0 uses *PublishProperties* to carry metadata so that
    ``MQTTParserV5`` can extract them transparently.
    For MQTT 3.1.1 returns a plain message with raw payload only.
    """
    payload, content_type = encode_message(message, serializer=serializer)

    if version == "3.1.1":
        return zmqtt.Message(
            topic=topic,
            payload=payload,
            qos=zmqtt.QoS(qos),
            retain=retain,
        )

    user_props: list[tuple[str, str]] = list((headers or {}).items())

    properties = zmqtt.PublishProperties(
        content_type=content_type or None,
        response_topic=reply_to or None,
        correlation_data=correlation_id.encode() if correlation_id else None,
        user_properties=tuple(user_props),
    )

    return zmqtt.Message(
        topic=topic,
        payload=payload,
        qos=zmqtt.QoS(qos),
        retain=retain,
        properties=properties,
    )