Skip to content

KafkaResponse

faststream.confluent.response.KafkaResponse #

KafkaResponse(
    body,
    *,
    headers=None,
    correlation_id=None,
    timestamp_ms=None,
    key=None,
)

Bases: Response

Source code in faststream/confluent/response.py
def __init__(
    self,
    body: "SendableMessage",
    *,
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    timestamp_ms: int | None = None,
    key: bytes | str | None = None,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.timestamp_ms = timestamp_ms
    self.key = key

timestamp_ms instance-attribute #

timestamp_ms = timestamp_ms

key instance-attribute #

key = key

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

as_publish_command #

as_publish_command()
Source code in faststream/confluent/response.py
@override
def as_publish_command(self) -> "KafkaPublishCommand":
    return KafkaPublishCommand(
        self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
        # Kafka specific
        topic="",
        key=self.key,
        timestamp_ms=self.timestamp_ms,
    )