Skip to content

NatsParser

faststream.nats.parser.NatsParser #

NatsParser(*, pattern: str, is_ack_disabled: bool)

Bases: NatsBaseParser

A class to parse NATS core messages.

Source code in faststream/nats/parser.py
def __init__(self, *, pattern: str, is_ack_disabled: bool) -> None:
    super().__init__(pattern=pattern)

    self.is_ack_disabled = is_ack_disabled

is_ack_disabled instance-attribute #

is_ack_disabled = is_ack_disabled

parse_message async #

parse_message(message: Msg) -> StreamMessage[Msg]
Source code in faststream/nats/parser.py
async def parse_message(
    self,
    message: "Msg",
) -> "StreamMessage[Msg]":
    path = match_path(self._path_re, message.subject)

    headers = message.header or {}

    if self.is_ack_disabled:
        message._ackd = True

    return NatsMessage(
        raw_message=message,
        body=message.data,
        path=path,
        reply_to=message.reply,
        headers=headers,
        content_type=headers.get("content-type", ""),
        message_id=headers.get("message_id"),
        correlation_id=headers.get("correlation_id"),
    )

decode_message async #

decode_message(msg: StreamMessage[Any]) -> DecodedMessage
Source code in faststream/nats/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[Any]",
) -> "DecodedMessage":
    return decode_message(msg)