Skip to content

AioKafkaBatchParser

faststream.kafka.parser.AioKafkaBatchParser #

AioKafkaBatchParser(
    msg_class: type[KafkaMessage],
    regex: Optional[Pattern[str]],
)

Bases: AioKafkaParser

Source code in faststream/kafka/parser.py
def __init__(
    self,
    msg_class: type[KafkaMessage],
    regex: Optional["Pattern[str]"],
) -> None:
    self.msg_class = msg_class
    self.regex = regex

    self._consumer: ConsumerProtocol = FAKE_CONSUMER

msg_class instance-attribute #

msg_class = msg_class

regex instance-attribute #

regex = regex

parse_batch async #

parse_batch(
    message: tuple[ConsumerRecord, ...],
) -> StreamMessage[tuple[ConsumerRecord, ...]]

Parses a batch of messages from a Kafka consumer.

Source code in faststream/kafka/parser.py
async def parse_batch(
    self,
    message: tuple["ConsumerRecord", ...],
) -> "StreamMessage[tuple[ConsumerRecord, ...]]":
    """Parses a batch of messages from a Kafka consumer."""
    body: list[Any] = []
    batch_headers: list[dict[str, str]] = []

    first = message[0]
    last = message[-1]

    for m in message:
        body.append(m.value or b"")
        batch_headers.append({i: j.decode() for i, j in m.headers})

    headers = next(iter(batch_headers), {})

    return self.msg_class(
        body=body,
        headers=headers,
        batch_headers=batch_headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{first.offset}-{last.offset}-{first.timestamp}",
        correlation_id=headers.get("correlation_id"),
        raw_message=message,
        path=match_path(self.regex, first.topic),
        consumer=self._consumer,
    )

decode_batch async #

decode_batch(
    msg: StreamMessage[tuple[ConsumerRecord, ...]],
) -> DecodedMessage

Decode a batch of messages.

Source code in faststream/kafka/parser.py
async def decode_batch(
    self,
    msg: "StreamMessage[tuple[ConsumerRecord, ...]]",
) -> "DecodedMessage":
    """Decode a batch of messages."""
    # super() should be here due python can't find it in comprehension
    super_obj = cast("AioKafkaParser", super())

    return [decode_message(await super_obj.parse_message(m)) for m in msg.raw_message]

parse_message async #

parse_message(
    message: Union[ConsumerRecord, KafkaRawMessage],
) -> StreamMessage[ConsumerRecord]

Parses a Kafka message.

Source code in faststream/kafka/parser.py
async def parse_message(
    self,
    message: Union["ConsumerRecord", "KafkaRawMessage"],
) -> "StreamMessage[ConsumerRecord]":
    """Parses a Kafka message."""
    headers = {i: j.decode() for i, j in message.headers}

    return self.msg_class(
        body=message.value or b"",
        headers=headers,
        reply_to=headers.get("reply_to", ""),
        content_type=headers.get("content-type"),
        message_id=f"{message.offset}-{message.timestamp}",
        correlation_id=headers.get("correlation_id"),
        raw_message=message,
        path=match_path(self.regex, message.topic),
        consumer=getattr(message, "consumer", self._consumer),
    )

decode_message async #

decode_message(
    msg: StreamMessage[ConsumerRecord],
) -> DecodedMessage

Decodes a message.

Source code in faststream/kafka/parser.py
async def decode_message(
    self,
    msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
    """Decodes a message."""
    return decode_message(msg)