Bases: NatsBaseParser
A class to parse NATS core messages.
Source code in faststream/nats/parser.py
| def __init__(self, *, pattern: str, is_ack_disabled: bool) -> None:
super().__init__(pattern=pattern)
self.is_ack_disabled = is_ack_disabled
|
is_ack_disabled instance-attribute
is_ack_disabled = is_ack_disabled
parse_message async
Source code in faststream/nats/parser.py
| async def parse_message(
self,
message: "Msg",
) -> "StreamMessage[Msg]":
path = match_path(self._path_re, message.subject)
headers = message.header or {}
if self.is_ack_disabled:
message._ackd = True
return NatsMessage(
raw_message=message,
body=message.data,
path=path,
reply_to=message.reply,
headers=headers,
content_type=headers.get("content-type", ""),
message_id=headers.get("message_id"),
correlation_id=headers.get("correlation_id"),
)
|
decode_message async
Source code in faststream/nats/parser.py
| async def decode_message(
self,
msg: "StreamMessage[Any]",
) -> "DecodedMessage":
return decode_message(msg)
|