Skip to content

RabbitMessage

faststream.rabbit.message.RabbitMessage #

RabbitMessage(
    raw_message,
    body,
    *,
    headers=None,
    reply_to="",
    batch_headers=None,
    path=None,
    content_type=None,
    correlation_id=None,
    message_id=None,
    source_type=CONSUME,
)

Bases: StreamMessage[IncomingMessage]

A message class for working with RabbitMQ messages.

This class extends StreamMessage to provide additional functionality for acknowledging, rejecting, or nack-ing RabbitMQ messages.

Source code in faststream/message/message.py
def __init__(
    self,
    raw_message: "MsgType",
    body: bytes | Any,
    *,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    batch_headers: list[dict[str, Any]] | None = None,
    path: dict[str, Any] | None = None,
    content_type: str | None = None,
    correlation_id: str | None = None,
    message_id: str | None = None,
    source_type: SourceType = SourceType.CONSUME,
) -> None:
    self.raw_message = raw_message
    self.body = body
    self.reply_to = reply_to
    self.content_type = content_type
    self.source_type = source_type

    self.headers = headers or {}
    self.batch_headers = batch_headers or []
    self.path = path or {}
    self.correlation_id = correlation_id or str(uuid4())
    self.message_id = message_id or self.correlation_id

    self.committed: AckStatus | None = None
    self.processed = False

    # Setup later
    self.__decoder: AsyncCallable | None = None
    self.__decoded_caches: dict[
        Any,
        Any,
    ] = {}  # Cache values between filters and tests

raw_message instance-attribute #

raw_message = raw_message

body instance-attribute #

body = body

reply_to instance-attribute #

reply_to = reply_to

content_type instance-attribute #

content_type = content_type

source_type instance-attribute #

source_type = source_type

headers instance-attribute #

headers = headers or {}

batch_headers instance-attribute #

batch_headers = batch_headers or []

path instance-attribute #

path = path or {}

correlation_id instance-attribute #

correlation_id = correlation_id or str(uuid4())

message_id instance-attribute #

message_id = message_id or correlation_id

committed instance-attribute #

committed = None

processed instance-attribute #

processed = False

ack async #

ack(multiple=False)

Acknowledge the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def ack(
    self,
    multiple: bool = False,
) -> None:
    """Acknowledge the RabbitMQ message."""
    pika_message = self.raw_message
    await super().ack()
    if pika_message.locked:
        return
    await pika_message.ack(multiple=multiple)

nack async #

nack(multiple=False, requeue=True)

Negative Acknowledgment of the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def nack(
    self,
    multiple: bool = False,
    requeue: bool = True,
) -> None:
    """Negative Acknowledgment of the RabbitMQ message."""
    pika_message = self.raw_message
    await super().nack()
    if pika_message.locked:
        return
    await pika_message.nack(multiple=multiple, requeue=requeue)

reject async #

reject(requeue=False)

Reject the RabbitMQ message.

Source code in faststream/rabbit/message.py
async def reject(
    self,
    requeue: bool = False,
) -> None:
    """Reject the RabbitMQ message."""
    pika_message = self.raw_message
    await super().reject()
    if pika_message.locked:
        return
    await pika_message.reject(requeue=requeue)

set_decoder #

set_decoder(decoder)
Source code in faststream/message/message.py
def set_decoder(self, decoder: "AsyncCallable") -> None:
    self.__decoder = decoder

clear_cache #

clear_cache()
Source code in faststream/message/message.py
def clear_cache(self) -> None:
    self.__decoded_caches.clear()

decode async #

decode()

Serialize the message by lazy decoder.

Returns a cache after first usage. To prevent such behavior, please call message.clear_cache() after message.body changes.

Source code in faststream/message/message.py
async def decode(self) -> Optional["Any"]:
    """Serialize the message by lazy decoder.

    Returns a cache after first usage. To prevent such behavior, please call
    `message.clear_cache()` after `message.body` changes.
    """
    assert self.__decoder, "You should call `set_decoder()` method first."

    if (result := self.__decoded_caches.get(self.__decoder)) is None:
        result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)

    return result