Bases: MessageFormat
Message format to encode into JSON and parse it.
Source code in faststream/redis/parser/message.py
| def __init__(
self,
data: bytes,
headers: dict[str, Any] | None = None,
) -> None:
self.data = data
self.headers = headers or {}
|
encode(
*,
message,
reply_to,
headers,
correlation_id,
serializer=None,
)
Source code in faststream/redis/parser/json.py
| @classmethod
def encode(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: str | None,
headers: dict[str, Any] | None,
correlation_id: str,
serializer: Optional["SerializerProto"] = None,
) -> bytes:
msg = cls.build(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
serializer=serializer,
)
return dump_json({
"data": msg.data,
"headers": msg.headers,
})
|
Source code in faststream/redis/parser/json.py
| @classmethod
def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]:
headers: dict[str, Any]
try:
parsed_data = json_loads(data)
final_data = parsed_data["data"].encode()
headers = parsed_data.get("headers", {})
except Exception:
# Raw Redis message format
final_data = data
headers = {}
return final_data, headers
|
build(
*,
message,
reply_to,
headers,
correlation_id,
serializer=None,
)
Source code in faststream/redis/parser/message.py
| @classmethod
def build(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: str | None,
headers: dict[str, Any] | None,
correlation_id: str,
serializer: Optional["SerializerProto"] = None,
) -> "MessageFormat":
payload, content_type = encode_message(message, serializer=serializer)
headers_to_send = {
"correlation_id": correlation_id,
}
if content_type:
headers_to_send["content-type"] = content_type
if reply_to:
headers_to_send["reply_to"] = reply_to
if headers is not None:
headers_to_send.update(headers)
return cls(
data=payload,
headers=headers_to_send,
)
|