Skip to content

PublishCommand

faststream.PublishCommand #

PublishCommand(
    body,
    *,
    _publish_type,
    reply_to="",
    destination="",
    correlation_id=None,
    headers=None,
)

Bases: Response

Source code in faststream/response/response.py
def __init__(
    self,
    body: Any,
    *,
    _publish_type: PublishType,
    reply_to: str = "",
    destination: str = "",
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
) -> None:
    super().__init__(
        body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.destination = destination
    self.reply_to = reply_to

    self.publish_type = _publish_type

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

batch_bodies property #

batch_bodies

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers

from_cmd classmethod #

from_cmd(cmd)
Source code in faststream/response/response.py
@classmethod
def from_cmd(cls, cmd: Self) -> Self:
    raise NotImplementedError