Skip to content

KafkaMessage

faststream.confluent.message.KafkaMessage #

KafkaMessage(*args, consumer, is_manual=False, **kwargs)

Bases: StreamMessage[Message | tuple[Message, ...]]

Represents a Kafka message in the FastStream framework.

This class extends StreamMessage and is specialized for handling confluent_kafka.Message objects.

Source code in faststream/confluent/message.py
def __init__(
    self,
    *args: Any,
    consumer: ConsumerProtocol,
    is_manual: bool = False,
    **kwargs: Any,
) -> None:
    super().__init__(*args, **kwargs)

    self.consumer = consumer

    self.is_manual = is_manual
    if not is_manual:
        self.committed = AckStatus.ACKED

consumer instance-attribute #

consumer = consumer

is_manual instance-attribute #

is_manual = is_manual

committed instance-attribute #

committed = ACKED

raw_message instance-attribute #

raw_message = raw_message

body instance-attribute #

body = body

reply_to instance-attribute #

reply_to = reply_to

content_type instance-attribute #

content_type = content_type

source_type instance-attribute #

source_type = source_type

headers instance-attribute #

headers = headers or {}

batch_headers instance-attribute #

batch_headers = batch_headers or []

path instance-attribute #

path = path or {}

correlation_id instance-attribute #

correlation_id = correlation_id or str(uuid4())

message_id instance-attribute #

message_id = message_id or correlation_id

processed instance-attribute #

processed = False

ack async #

ack()

Acknowledge the Kafka message.

Source code in faststream/confluent/message.py
async def ack(self) -> None:
    """Acknowledge the Kafka message."""
    if self.is_manual and not self.committed:
        await self.consumer.commit()
    await super().ack()

nack async #

nack()

Reject the Kafka message.

Source code in faststream/confluent/message.py
async def nack(self) -> None:
    """Reject the Kafka message."""
    if self.is_manual and not self.committed:
        raw_message: Message = (
            self.raw_message[0]
            if isinstance(self.raw_message, tuple)
            else self.raw_message
        )
        await self.consumer.seek(
            topic=raw_message.topic(),
            partition=raw_message.partition(),
            offset=raw_message.offset(),
        )
    await super().nack()

set_decoder #

set_decoder(decoder)
Source code in faststream/message/message.py
def set_decoder(self, decoder: "AsyncCallable") -> None:
    self.__decoder = decoder

clear_cache #

clear_cache()
Source code in faststream/message/message.py
def clear_cache(self) -> None:
    self.__decoded_caches.clear()

decode async #

decode()

Serialize the message by lazy decoder.

Returns a cache after first usage. To prevent such behavior, please call message.clear_cache() after message.body changes.

Source code in faststream/message/message.py
async def decode(self) -> Optional["Any"]:
    """Serialize the message by lazy decoder.

    Returns a cache after first usage. To prevent such behavior, please call
    `message.clear_cache()` after `message.body` changes.
    """
    assert self.__decoder, "You should call `set_decoder()` method first."

    if (result := self.__decoded_caches.get(self.__decoder)) is None:
        result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)

    return result

reject async #

reject()
Source code in faststream/message/message.py
async def reject(self) -> None:
    if self.committed is None:
        self.committed = AckStatus.REJECTED