Skip to content

PatchedMessage

faststream.nats.testing.PatchedMessage #

PatchedMessage(
    _client: NATS,
    subject: str = "",
    reply: str = "",
    data: bytes = b"",
    headers: Optional[Dict[str, str]] = None,
    _metadata: Optional[Metadata] = None,
    _ackd: bool = False,
    _sid: Optional[int] = None,
)

Bases: Msg

subject class-attribute instance-attribute #

subject: str = ''

reply class-attribute instance-attribute #

reply: str = ''

data class-attribute instance-attribute #

data: bytes = b''

headers class-attribute instance-attribute #

headers: Optional[Dict[str, str]] = None

header property #

header: Optional[Dict[str, str]]

header returns the headers from a message.

is_acked property #

is_acked: bool

Have we sent a terminal ack message (not in-progress) in response to this original message?

sid property #

sid: int

sid returns the subscription ID from a message.

metadata property #

metadata: Metadata

metadata returns the Metadata of a JetStream message.

Ack #

Ack class-attribute instance-attribute #

Ack = b'+ACK'

Nak class-attribute instance-attribute #

Nak = b'-NAK'

Progress class-attribute instance-attribute #

Progress = b'+WPI'

Term class-attribute instance-attribute #

Term = b'+TERM'

Prefix0 class-attribute instance-attribute #

Prefix0 = '$JS'

Prefix1 class-attribute instance-attribute #

Prefix1 = 'ACK'

Domain class-attribute instance-attribute #

Domain = 2

AccHash class-attribute instance-attribute #

AccHash = 3

Stream class-attribute instance-attribute #

Stream = 4

Consumer class-attribute instance-attribute #

Consumer = 5

NumDelivered class-attribute instance-attribute #

NumDelivered = 6

StreamSeq class-attribute instance-attribute #

StreamSeq = 7

ConsumerSeq class-attribute instance-attribute #

ConsumerSeq = 8

Timestamp class-attribute instance-attribute #

Timestamp = 9

NumPending class-attribute instance-attribute #

NumPending = 10

Metadata dataclass #

Metadata(
    sequence: SequencePair,
    num_pending: int,
    num_delivered: int,
    timestamp: datetime,
    stream: str,
    consumer: str,
    domain: Optional[str] = None,
)

Metadata is the metadata from a JetStream message.

  • num_pending is the number of available messages in the Stream that have not been consumed yet.
  • num_delivered is the number of times that this message has been delivered. For example, num_delivered higher than one means that there have been redeliveries.
  • timestamp is the time at which the message was delivered.
  • stream is the name of the stream.
  • consumer is the name of the consumer.

sequence instance-attribute #

sequence: SequencePair

num_pending instance-attribute #

num_pending: int

num_delivered instance-attribute #

num_delivered: int

timestamp instance-attribute #

timestamp: datetime

stream instance-attribute #

stream: str

consumer instance-attribute #

consumer: str

domain class-attribute instance-attribute #

domain: Optional[str] = None

SequencePair dataclass #

SequencePair(consumer: int, stream: int)

SequencePair represents a pair of consumer and stream sequence.

consumer instance-attribute #
consumer: int
stream instance-attribute #
stream: int

ack async #

ack() -> None
Source code in faststream/nats/testing.py
async def ack(self) -> None:
    pass

ack_sync async #

ack_sync(timeout: float = 1) -> PatchedMessage
Source code in faststream/nats/testing.py
async def ack_sync(
    self,
    timeout: float = 1,
) -> "PatchedMessage":  # pragma: no cover
    return self

nak async #

nak(delay: float | None = None) -> None
Source code in faststream/nats/testing.py
async def nak(self, delay: float | None = None) -> None:
    pass

term async #

term() -> None
Source code in faststream/nats/testing.py
async def term(self) -> None:
    pass

in_progress async #

in_progress() -> None
Source code in faststream/nats/testing.py
async def in_progress(self) -> None:
    pass

respond async #

respond(data: bytes) -> None

respond replies to the inbox of the message if there is one.

Source code in nats/aio/msg.py
async def respond(self, data: bytes) -> None:
    """
    respond replies to the inbox of the message if there is one.
    """
    if not self.reply:
        raise Error("no reply subject available")
    if not self._client:
        raise Error("client not set")

    await self._client.publish(self.reply, data, headers=self.headers)