Bases: MessageFormat
Message format to encode into binary and parse it.
Source code in faststream/redis/parser.py
| def __init__(
self,
data: bytes,
headers: Optional["AnyDict"] = None,
) -> None:
self.data = data
self.headers = headers or {}
|
IDENTITY_HEADER = b'\x89BIN\r\n\x1a\n'
encode(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
| @classmethod
def encode(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: Optional[str],
headers: Optional["AnyDict"],
correlation_id: str,
) -> bytes:
msg = cls.build(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
)
headers_writer = BinaryWriter()
for key, value in msg.headers.items():
headers_writer.write_string(key)
headers_writer.write_string(value)
headers_len = len(headers_writer.data)
writer = BinaryWriter()
writer.write(cls.IDENTITY_HEADER)
writer.write_short(FastStreamMessageVersion.v1.value)
headers_start = len(writer.data) + 8
data_start = 2 + headers_start + headers_len
writer.write_int(headers_start)
writer.write_int(data_start)
writer.write_short(len(msg.headers.items()))
writer.write(headers_writer.get_bytes())
writer.write(msg.data)
return writer.get_bytes()
|
Source code in faststream/redis/parser.py
| @classmethod
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
headers: AnyDict
try:
reader = BinaryReader(data)
headers = {}
magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
message_version = reader.read_short()
if (
magic_header == cls.IDENTITY_HEADER
and message_version == FastStreamMessageVersion.v1.value
):
headers_start = reader.read_int()
data_start = reader.read_int()
reader.shift_offset_to(headers_start)
header_count = reader.read_short()
for _ in range(header_count):
key = reader.read_string()
value = reader.read_string()
headers[key] = value
reader.shift_offset_to(data_start)
data = reader.read_bytes()
else:
parsed_data = json_loads(data)
data = parsed_data["data"].encode()
headers = parsed_data["headers"]
except Exception:
# Raw Redis message format
data = data
headers = {}
return data, headers
|
build(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
| @classmethod
def build(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: Optional[str],
headers: Optional["AnyDict"],
correlation_id: str,
) -> "MessageFormat":
payload, content_type = encode_message(message)
headers_to_send = {
"correlation_id": correlation_id,
}
if content_type:
headers_to_send["content-type"] = content_type
if reply_to:
headers_to_send["reply_to"] = reply_to
if headers is not None:
headers_to_send.update(headers)
return cls(
data=payload,
headers=headers_to_send,
)
|