Skip to content

NatsResponse

faststream.nats.response.NatsResponse #

NatsResponse(
    body, *, headers=None, correlation_id=None, stream=None
)

Bases: Response

Source code in faststream/nats/response.py
def __init__(
    self,
    body: "SendableMessage",
    *,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    stream: str | None = None,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )
    self.stream = stream

stream instance-attribute #

stream = stream

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

as_publish_command #

as_publish_command()
Source code in faststream/nats/response.py
@override
def as_publish_command(self) -> "NatsPublishCommand":
    return NatsPublishCommand(
        message=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
        # Nats specific
        subject="",
        stream=self.stream,
    )