Bases: MessageFormat
Message format to encode into binary and parse it.
Source code in faststream/redis/parser/message.py
| def __init__(
self,
data: bytes,
headers: dict[str, Any] | None = None,
) -> None:
self.data = data
self.headers = headers or {}
|
IDENTITY_HEADER = b'\x89BIN\r\n\x1a\n'
encode(
*,
message,
reply_to,
headers,
correlation_id,
serializer=None,
)
Source code in faststream/redis/parser/binary.py
| @classmethod
def encode(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: str | None,
headers: dict[str, Any] | None,
correlation_id: str,
serializer: Optional["SerializerProto"] = None,
) -> bytes:
msg = cls.build(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
serializer=serializer,
)
headers_writer = BinaryWriter()
for key, value in msg.headers.items():
headers_writer.write_string(key)
headers_writer.write_string(value)
headers_len = len(headers_writer.data)
writer = BinaryWriter()
writer.write(cls.IDENTITY_HEADER)
writer.write_short(FastStreamMessageVersion.v1.value)
headers_start = len(writer.data) + 8
data_start = 2 + headers_start + headers_len
writer.write_int(headers_start)
writer.write_int(data_start)
writer.write_short(len(msg.headers.items()))
writer.write(headers_writer.get_bytes())
writer.write(msg.data)
return writer.get_bytes()
|
Source code in faststream/redis/parser/binary.py
| @classmethod
def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]:
headers: dict[str, Any] = {}
final_data: bytes
try:
reader = BinaryReader(data)
magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
message_version = reader.read_short()
if (
magic_header == cls.IDENTITY_HEADER
and message_version == FastStreamMessageVersion.v1.value
):
headers_start = reader.read_int()
data_start = reader.read_int()
reader.shift_offset_to(headers_start)
header_count = reader.read_short()
for _ in range(header_count):
key = reader.read_string()
value = reader.read_string()
headers[key] = value
reader.shift_offset_to(data_start)
final_data = reader.read_bytes()
else:
parsed_data = json_loads(data)
final_data = parsed_data["data"].encode()
headers = parsed_data["headers"]
except Exception:
# Raw Redis message format
# TODO: fallback to raw data after JSONMessageFormat removed
return JSONMessageFormat.parse(data)
return final_data, headers
|
build(
*,
message,
reply_to,
headers,
correlation_id,
serializer=None,
)
Source code in faststream/redis/parser/message.py
| @classmethod
def build(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: str | None,
headers: dict[str, Any] | None,
correlation_id: str,
serializer: Optional["SerializerProto"] = None,
) -> "MessageFormat":
payload, content_type = encode_message(message, serializer=serializer)
headers_to_send = {
"correlation_id": correlation_id,
}
if content_type:
headers_to_send["content-type"] = content_type
if reply_to:
headers_to_send["reply_to"] = reply_to
if headers is not None:
headers_to_send.update(headers)
return cls(
data=payload,
headers=headers_to_send,
)
|