Skip to content

KafkaPublishCommand

faststream.kafka.KafkaPublishCommand #

KafkaPublishCommand(
    message,
    /,
    *messages,
    topic,
    _publish_type,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    reply_to="",
    no_confirm=False,
    timeout=0.5,
)

Bases: BatchPublishCommand

Source code in faststream/kafka/response.py
def __init__(
    self,
    message: "SendableMessage",
    /,
    *messages: "SendableMessage",
    topic: str,
    _publish_type: PublishType,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
    timeout: float = 0.5,
) -> None:
    super().__init__(
        message,
        *messages,
        destination=topic,
        reply_to=reply_to,
        correlation_id=correlation_id,
        headers=headers,
        _publish_type=_publish_type,
    )

    self.key = key
    self.partition = partition
    self.timestamp_ms = timestamp_ms
    self.no_confirm = no_confirm

    # request option
    self.timeout = timeout

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

batch_bodies property writable #

batch_bodies

extra_bodies instance-attribute #

extra_bodies = bodies

key instance-attribute #

key = key

partition instance-attribute #

partition = partition

timestamp_ms instance-attribute #

timestamp_ms = timestamp_ms

no_confirm instance-attribute #

no_confirm = no_confirm

timeout instance-attribute #

timeout = timeout

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers

from_cmd classmethod #

from_cmd(cmd, *, batch=False)
Source code in faststream/kafka/response.py
@classmethod
def from_cmd(
    cls,
    cmd: Union["PublishCommand", "KafkaPublishCommand"],
    *,
    batch: bool = False,
) -> "KafkaPublishCommand":
    if isinstance(cmd, KafkaPublishCommand):
        # NOTE: Should return a copy probably.
        return cmd

    body, extra_bodies = cls._parse_bodies(cmd.body, batch=batch)

    return cls(
        body,
        *extra_bodies,
        topic=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        reply_to=cmd.reply_to,
        _publish_type=cmd.publish_type,
    )

headers_to_publish #

headers_to_publish()
Source code in faststream/kafka/response.py
def headers_to_publish(self) -> dict[str, str]:
    headers = {}

    if self.correlation_id:
        headers["correlation_id"] = self.correlation_id

    if self.reply_to:
        headers["reply_to"] = self.reply_to

    return headers | self.headers