async def parse_message(
self,
message: Mapping[str, Any],
) -> "StreamMessage[Mapping[str, Any]]":
data, headers, batch_headers = self._parse_data(message)
id_ = gen_cor_id()
return self.msg_class(
raw_message=message,
body=data,
# Only pattern-subscribed messages have "pattern" set;
# guard here before calling match_path.
path=match_path(self.pattern, message["channel"])
if message.get("pattern")
else {},
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=headers.get("message_id", id_),
correlation_id=headers.get("correlation_id", id_),
)