def build_message(
message: "SendableMessage",
topic: str,
*,
correlation_id: str | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
key: bytes | str | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
serializer: Optional["SerializerProto"] = None,
) -> MockConfluentMessage:
"""Build a mock confluent_kafka.Message for a sendable message."""
msg, content_type = encode_message(message, serializer)
k = key or b""
headers = {
"content-type": content_type or "",
"correlation_id": correlation_id or gen_cor_id(),
"reply_to": reply_to,
**(headers or {}),
}
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
return MockConfluentMessage(
raw_msg=msg,
topic=topic,
key=k,
headers=[(i, j.encode()) for i, j in headers.items()],
offset=0,
partition=partition or 0,
timestamp_type=1,
timestamp_ms=timestamp_ms or int(datetime.now(timezone.utc).timestamp() * 1000),
)