RabbitMessage(
raw_message,
body,
*,
headers=None,
reply_to="",
batch_headers=None,
path=None,
content_type=None,
correlation_id=None,
message_id=None,
source_type=CONSUME,
)
Bases: StreamMessage[IncomingMessage]
A message class for working with RabbitMQ messages.
This class extends StreamMessage
to provide additional functionality for acknowledging, rejecting,
or nack-ing RabbitMQ messages.
Source code in faststream/message/message.py
| def __init__(
self,
raw_message: "MsgType",
body: bytes | Any,
*,
headers: dict[str, Any] | None = None,
reply_to: str = "",
batch_headers: list[dict[str, Any]] | None = None,
path: dict[str, Any] | None = None,
content_type: str | None = None,
correlation_id: str | None = None,
message_id: str | None = None,
source_type: SourceType = SourceType.CONSUME,
) -> None:
self.raw_message = raw_message
self.body = body
self.reply_to = reply_to
self.content_type = content_type
self.source_type = source_type
self.headers = headers or {}
self.batch_headers = batch_headers or []
self.path = path or {}
self.correlation_id = correlation_id or str(uuid4())
self.message_id = message_id or self.correlation_id
self.committed: AckStatus | None = None
self.processed = False
# Setup later
self.__decoder: AsyncCallable | None = None
self.__decoded_caches: dict[
Any,
Any,
] = {} # Cache values between filters and tests
|
raw_message
instance-attribute
raw_message = raw_message
reply_to
instance-attribute
content_type
instance-attribute
content_type = content_type
source_type
instance-attribute
source_type = source_type
batch_headers = batch_headers or []
correlation_id
instance-attribute
correlation_id = correlation_id or str(uuid4())
message_id
instance-attribute
message_id = message_id or correlation_id
committed
instance-attribute
processed
instance-attribute
ack
async
Acknowledge the RabbitMQ message.
Source code in faststream/rabbit/message.py
| async def ack(
self,
multiple: bool = False,
) -> None:
"""Acknowledge the RabbitMQ message."""
pika_message = self.raw_message
await super().ack()
if pika_message.locked:
return
await pika_message.ack(multiple=multiple)
|
nack
async
nack(multiple=False, requeue=True)
Negative Acknowledgment of the RabbitMQ message.
Source code in faststream/rabbit/message.py
| async def nack(
self,
multiple: bool = False,
requeue: bool = True,
) -> None:
"""Negative Acknowledgment of the RabbitMQ message."""
pika_message = self.raw_message
await super().nack()
if pika_message.locked:
return
await pika_message.nack(multiple=multiple, requeue=requeue)
|
reject
async
Reject the RabbitMQ message.
Source code in faststream/rabbit/message.py
| async def reject(
self,
requeue: bool = False,
) -> None:
"""Reject the RabbitMQ message."""
pika_message = self.raw_message
await super().reject()
if pika_message.locked:
return
await pika_message.reject(requeue=requeue)
|
set_decoder
Source code in faststream/message/message.py
| def set_decoder(self, decoder: "AsyncCallable") -> None:
self.__decoder = decoder
|
clear_cache
Source code in faststream/message/message.py
| def clear_cache(self) -> None:
self.__decoded_caches.clear()
|
decode
async
Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
message.clear_cache()
after message.body
changes.
Source code in faststream/message/message.py
| async def decode(self) -> Optional["Any"]:
"""Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
`message.clear_cache()` after `message.body` changes.
"""
assert self.__decoder, "You should call `set_decoder()` method first."
if (result := self.__decoded_caches.get(self.__decoder)) is None:
result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)
return result
|