Bases: StreamMessage[Union['ConsumerRecord', tuple['ConsumerRecord', ...]]]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage and is specialized for handling Kafka ConsumerRecord objects.
Source code in faststream/kafka/message.py
| def __init__(self, *args: Any, consumer: ConsumerProtocol, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.consumer = consumer
self.committed = AckStatus.ACKED
|
consumer instance-attribute
committed instance-attribute
raw_message instance-attribute
raw_message = raw_message
reply_to instance-attribute
content_type instance-attribute
content_type = content_type
source_type instance-attribute
source_type = source_type
batch_headers = batch_headers or []
correlation_id instance-attribute
correlation_id = correlation_id or str(uuid4())
message_id instance-attribute
message_id = message_id or correlation_id
processed instance-attribute
set_decoder
set_decoder(decoder: AsyncCallable) -> None
Source code in faststream/message/message.py
| def set_decoder(self, decoder: "AsyncCallable") -> None:
self.__decoder = decoder
|
clear_cache
Source code in faststream/message/message.py
| def clear_cache(self) -> None:
self.__decoded_caches.clear()
|
decode async
Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call message.clear_cache() after message.body changes.
Source code in faststream/message/message.py
| async def decode(self) -> Optional["Any"]:
"""Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
`message.clear_cache()` after `message.body` changes.
"""
assert self.__decoder, "You should call `set_decoder()` method first."
if (result := self.__decoded_caches.get(self.__decoder)) is None:
result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)
return result
|
ack async
Source code in faststream/message/message.py
| async def ack(self) -> None:
if self.committed is None:
self.committed = AckStatus.ACKED
|
nack async
Source code in faststream/message/message.py
| async def nack(self) -> None:
if self.committed is None:
self.committed = AckStatus.NACKED
|
reject async
Source code in faststream/message/message.py
| async def reject(self) -> None:
if self.committed is None:
self.committed = AckStatus.REJECTED
|