Skip to content

NatsPublishCommand

faststream.nats.response.NatsPublishCommand #

NatsPublishCommand(
    message,
    *,
    subject="",
    correlation_id=None,
    headers=None,
    reply_to="",
    stream=None,
    timeout=0.5,
    _publish_type,
)

Bases: PublishCommand

Source code in faststream/nats/response.py
def __init__(
    self,
    message: "SendableMessage",
    *,
    subject: str = "",
    correlation_id: str | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    stream: str | None = None,
    timeout: float = 0.5,
    _publish_type: PublishType,
) -> None:
    super().__init__(
        body=message,
        destination=subject,
        correlation_id=correlation_id,
        headers=headers,
        reply_to=reply_to,
        _publish_type=_publish_type,
    )

    self.stream = stream
    self.timeout = timeout

stream instance-attribute #

stream = stream

timeout instance-attribute #

timeout = timeout

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

batch_bodies property #

batch_bodies

headers_to_publish #

headers_to_publish(*, js=False)
Source code in faststream/nats/response.py
def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
    headers = {}

    if self.correlation_id:
        headers["correlation_id"] = self.correlation_id

    if js and self.reply_to:
        headers["reply_to"] = self.reply_to

    return headers | self.headers

from_cmd classmethod #

from_cmd(cmd)
Source code in faststream/nats/response.py
@classmethod
def from_cmd(
    cls,
    cmd: Union["PublishCommand", "NatsPublishCommand"],
) -> "NatsPublishCommand":
    if isinstance(cmd, NatsPublishCommand):
        # NOTE: Should return a copy probably.
        return cmd

    return cls(
        message=cmd.body,
        subject=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        reply_to=cmd.reply_to,
        _publish_type=cmd.publish_type,
    )

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers