Skip to content

KafkaResponse

faststream.kafka.KafkaResponse #

KafkaResponse(
    body: SendableMessage,
    *,
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    timestamp_ms: int | None = None,
    key: bytes | None = None,
)

Bases: Response

Kafka-specific response object for outgoing messages.

Can be used in two ways: 1. As a return value from handler to send a response message 2. Directly in publish_batch() to set per-message attributes (key, headers, etc.)

For publish operations, consider using the more semantic alias KafkaPublishMessage.

Source code in faststream/kafka/response.py
def __init__(
    self,
    body: "SendableMessage",
    *,
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    timestamp_ms: int | None = None,
    key: bytes | None = None,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.timestamp_ms = timestamp_ms
    self.key = key

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

timestamp_ms instance-attribute #

timestamp_ms = timestamp_ms

key instance-attribute #

key = key

get_publish_key #

get_publish_key() -> bytes | None

Return the Kafka message key for publishing.

Source code in faststream/kafka/response.py
@override
def get_publish_key(self) -> bytes | None:
    """Return the Kafka message key for publishing."""
    return self.key

as_publish_command #

as_publish_command() -> KafkaPublishCommand
Source code in faststream/kafka/response.py
@override
def as_publish_command(self) -> "KafkaPublishCommand":
    return KafkaPublishCommand(
        self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
        # Kafka specific
        topic="",
        key=self.key,
        timestamp_ms=self.timestamp_ms,
    )