Bases: StreamMessage[Union['ConsumerRecord', tuple['ConsumerRecord', ...]]]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling Kafka ConsumerRecord objects.
Source code in faststream/kafka/message.py
| def __init__(self, *args: Any, consumer: ConsumerProtocol, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.consumer = consumer
self.committed = AckStatus.ACKED
|
consumer
instance-attribute
committed
instance-attribute
raw_message
instance-attribute
raw_message = raw_message
reply_to
instance-attribute
content_type
instance-attribute
content_type = content_type
source_type
instance-attribute
source_type = source_type
batch_headers = batch_headers or []
correlation_id
instance-attribute
correlation_id = correlation_id or str(uuid4())
message_id
instance-attribute
message_id = message_id or correlation_id
processed
instance-attribute
set_decoder
Source code in faststream/message/message.py
| def set_decoder(self, decoder: "AsyncCallable") -> None:
self.__decoder = decoder
|
clear_cache
Source code in faststream/message/message.py
| def clear_cache(self) -> None:
self.__decoded_caches.clear()
|
decode
async
Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
message.clear_cache()
after message.body
changes.
Source code in faststream/message/message.py
| async def decode(self) -> Optional["Any"]:
"""Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
`message.clear_cache()` after `message.body` changes.
"""
assert self.__decoder, "You should call `set_decoder()` method first."
if (result := self.__decoded_caches.get(self.__decoder)) is None:
result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)
return result
|
ack
async
Source code in faststream/message/message.py
| async def ack(self) -> None:
if self.committed is None:
self.committed = AckStatus.ACKED
|
nack
async
Source code in faststream/message/message.py
| async def nack(self) -> None:
if self.committed is None:
self.committed = AckStatus.NACKED
|
reject
async
Source code in faststream/message/message.py
| async def reject(self) -> None:
if self.committed is None:
self.committed = AckStatus.REJECTED
|