Skip to content

build_message

faststream.confluent.testing.build_message #

build_message(
    message,
    topic,
    *,
    correlation_id=None,
    partition=None,
    timestamp_ms=None,
    key=None,
    headers=None,
    reply_to="",
    serializer=None,
)

Build a mock confluent_kafka.Message for a sendable message.

Source code in faststream/confluent/testing.py
def build_message(
    message: "SendableMessage",
    topic: str,
    *,
    correlation_id: str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    key: bytes | str | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    serializer: Optional["SerializerProto"] = None,
) -> MockConfluentMessage:
    """Build a mock confluent_kafka.Message for a sendable message."""
    msg, content_type = encode_message(message, serializer)
    k = key or b""
    headers = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or gen_cor_id(),
        "reply_to": reply_to,
        **(headers or {}),
    }

    # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
    return MockConfluentMessage(
        raw_msg=msg,
        topic=topic,
        key=k,
        headers=[(i, j.encode()) for i, j in headers.items()],
        offset=0,
        partition=partition or 0,
        timestamp_type=1,
        timestamp_ms=timestamp_ms or int(datetime.now(timezone.utc).timestamp() * 1000),
    )