Skip to content

BatchPublishCommand

faststream.response.BatchPublishCommand #

BatchPublishCommand(
    body,
    /,
    *bodies,
    _publish_type,
    reply_to="",
    destination="",
    correlation_id=None,
    headers=None,
)

Bases: PublishCommand

Source code in faststream/response/response.py
def __init__(
    self,
    body: Any,
    /,
    *bodies: Any,
    _publish_type: PublishType,
    reply_to: str = "",
    destination: str = "",
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
) -> None:
    super().__init__(
        body,
        headers=headers,
        correlation_id=correlation_id,
        destination=destination,
        reply_to=reply_to,
        _publish_type=_publish_type,
    )
    self.extra_bodies = bodies

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

extra_bodies instance-attribute #

extra_bodies = bodies

batch_bodies property writable #

batch_bodies

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers

from_cmd classmethod #

from_cmd(cmd, *, batch=False)
Source code in faststream/response/response.py
@classmethod
def from_cmd(
    cls,
    cmd: "PublishCommand",
    *,
    batch: bool = False,
) -> "BatchPublishCommand":
    raise NotImplementedError