KafkaMessage(*args, consumer, is_manual=False, **kwargs)
Bases: StreamMessage[Message | tuple[Message, ...]]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling confluent_kafka.Message objects.
Source code in faststream/confluent/message.py
| def __init__(
self,
*args: Any,
consumer: ConsumerProtocol,
is_manual: bool = False,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.consumer = consumer
self.is_manual = is_manual
if not is_manual:
self.committed = AckStatus.ACKED
|
consumer
instance-attribute
is_manual
instance-attribute
committed
instance-attribute
raw_message
instance-attribute
raw_message = raw_message
reply_to
instance-attribute
content_type
instance-attribute
content_type = content_type
source_type
instance-attribute
source_type = source_type
batch_headers = batch_headers or []
correlation_id
instance-attribute
correlation_id = correlation_id or str(uuid4())
message_id
instance-attribute
message_id = message_id or correlation_id
processed
instance-attribute
ack
async
Acknowledge the Kafka message.
Source code in faststream/confluent/message.py
| async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
await self.consumer.commit()
await super().ack()
|
nack
async
Reject the Kafka message.
Source code in faststream/confluent/message.py
| async def nack(self) -> None:
"""Reject the Kafka message."""
if self.is_manual and not self.committed:
raw_message: Message = (
self.raw_message[0]
if isinstance(self.raw_message, tuple)
else self.raw_message
)
await self.consumer.seek(
topic=raw_message.topic(),
partition=raw_message.partition(),
offset=raw_message.offset(),
)
await super().nack()
|
set_decoder
Source code in faststream/message/message.py
| def set_decoder(self, decoder: "AsyncCallable") -> None:
self.__decoder = decoder
|
clear_cache
Source code in faststream/message/message.py
| def clear_cache(self) -> None:
self.__decoded_caches.clear()
|
decode
async
Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
message.clear_cache()
after message.body
changes.
Source code in faststream/message/message.py
| async def decode(self) -> Optional["Any"]:
"""Serialize the message by lazy decoder.
Returns a cache after first usage. To prevent such behavior, please call
`message.clear_cache()` after `message.body` changes.
"""
assert self.__decoder, "You should call `set_decoder()` method first."
if (result := self.__decoded_caches.get(self.__decoder)) is None:
result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)
return result
|
reject
async
Source code in faststream/message/message.py
| async def reject(self) -> None:
if self.committed is None:
self.committed = AckStatus.REJECTED
|