Skip to content

AsyncConfluentParser

faststream.confluent.parser.AsyncConfluentParser #

AsyncConfluentParser(is_manual=False)

A class to parse Kafka messages.

Source code in faststream/confluent/parser.py
def __init__(self, is_manual: bool = False) -> None:
    self.is_manual = is_manual
    self._consumer: ConsumerProtocol = FAKE_CONSUMER

is_manual instance-attribute #

is_manual = is_manual

parse_message async #

parse_message(message)

Parses a Kafka message.

Source code in faststream/confluent/parser.py
async def parse_message(
    self,
    message: "Message",
) -> KafkaMessage:
    """Parses a Kafka message."""
    headers = _parse_msg_headers(message.headers() or ())

    body = message.value() or b""
    offset = message.offset()
    _, timestamp = message.timestamp()

    return KafkaMessage(
        body=body,
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{offset}-{timestamp}",
        correlation_id=headers.get("correlation_id"),
        raw_message=message,
        consumer=self._consumer,
        is_manual=self.is_manual,
    )

parse_message_batch async #

parse_message_batch(message)

Parses a batch of messages from a Kafka consumer.

Source code in faststream/confluent/parser.py
async def parse_message_batch(
    self,
    message: tuple["Message", ...],
) -> KafkaMessage:
    """Parses a batch of messages from a Kafka consumer."""
    body: list[Any] = []
    batch_headers: list[dict[str, str]] = []

    first = message[0]
    last = message[-1]

    for m in message:
        body.append(m.value() or b"")
        batch_headers.append(_parse_msg_headers(m.headers() or ()))

    headers = next(iter(batch_headers), {})

    _, first_timestamp = first.timestamp()

    return KafkaMessage(
        body=body,
        headers=headers,
        batch_headers=batch_headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}",
        correlation_id=headers.get("correlation_id"),
        raw_message=message,
        consumer=self._consumer,
        is_manual=self.is_manual,
    )

decode_message async #

decode_message(msg)

Decodes a message.

Source code in faststream/confluent/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[Message]",
) -> "DecodedMessage":
    """Decodes a message."""
    return decode_message(msg)

decode_message_batch async #

decode_message_batch(msg)

Decode a batch of messages.

Source code in faststream/confluent/parser.py
async def decode_message_batch(
    self,
    msg: "StreamMessage[tuple[Message, ...]]",
) -> "DecodedMessage":
    """Decode a batch of messages."""
    return [decode_message(await self.parse_message(m)) for m in msg.raw_message]