A class to parse Kafka messages.
Source code in faststream/kafka/parser.py
| def __init__(
self,
msg_class: type[KafkaMessage],
regex: Optional["Pattern[str]"],
) -> None:
self.msg_class = msg_class
self.regex = regex
self._consumer: ConsumerProtocol = FAKE_CONSUMER
|
msg_class
instance-attribute
parse_message
async
Parses a Kafka message.
Source code in faststream/kafka/parser.py
| async def parse_message(
self,
message: Union["ConsumerRecord", "KafkaRawMessage"],
) -> "StreamMessage[ConsumerRecord]":
"""Parses a Kafka message."""
headers = {i: j.decode() for i, j in message.headers}
return self.msg_class(
body=message.value or b"",
headers=headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{message.offset}-{message.timestamp}",
correlation_id=headers.get("correlation_id"),
raw_message=message,
path=self.get_path(message.topic),
consumer=getattr(message, "consumer", self._consumer),
)
|
decode_message
async
Decodes a message.
Source code in faststream/kafka/parser.py
| async def decode_message(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)
|
get_path
Source code in faststream/kafka/parser.py
| def get_path(self, topic: str) -> dict[str, str]:
if self.regex and (match := self.regex.match(topic)):
return match.groupdict()
return {}
|