Skip to content

build_message

faststream.kafka.testing.build_message #

build_message(
    message,
    topic,
    partition=None,
    timestamp_ms=None,
    key=None,
    headers=None,
    correlation_id=None,
    *,
    reply_to="",
    serializer,
)

Build a Kafka ConsumerRecord for a sendable message.

Source code in faststream/kafka/testing.py
def build_message(
    message: "SendableMessage",
    topic: str,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    key: bytes | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    *,
    reply_to: str = "",
    serializer: Optional["SerializerProto"],
) -> "ConsumerRecord":
    """Build a Kafka ConsumerRecord for a sendable message."""
    msg, content_type = encode_message(message, serializer=serializer)

    k = key or b""

    headers = {
        "content-type": content_type or "",
        "correlation_id": correlation_id or gen_cor_id(),
        **(headers or {}),
    }

    if reply_to:
        headers["reply_to"] = headers.get("reply_to", reply_to)

    return ConsumerRecord(
        value=msg,
        topic=topic,
        partition=partition or 0,
        key=k,
        serialized_key_size=len(k),
        serialized_value_size=len(msg),
        checksum=sum(msg),
        offset=0,
        headers=[(i, j.encode()) for i, j in headers.items()],
        timestamp_type=1,
        timestamp=timestamp_ms or int(datetime.now(timezone.utc).timestamp() * 1000),
    )