Skip to content

BinaryMessageFormatV1

faststream.redis.parser.BinaryMessageFormatV1 #

BinaryMessageFormatV1(data, headers=None)

Bases: MessageFormat

Message format to encode into binary and parse it.

Source code in faststream/redis/parser.py
def __init__(
    self,
    data: bytes,
    headers: Optional["AnyDict"] = None,
) -> None:
    self.data = data
    self.headers = headers or {}

IDENTITY_HEADER class-attribute instance-attribute #

IDENTITY_HEADER = b'\x89BIN\r\n\x1a\n'

data instance-attribute #

data = data

headers instance-attribute #

headers = headers or {}

encode classmethod #

encode(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
@classmethod
def encode(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: Optional[str],
    headers: Optional["AnyDict"],
    correlation_id: str,
) -> bytes:
    msg = cls.build(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )
    headers_writer = BinaryWriter()
    for key, value in msg.headers.items():
        headers_writer.write_string(key)
        headers_writer.write_string(value)

    headers_len = len(headers_writer.data)
    writer = BinaryWriter()
    writer.write(cls.IDENTITY_HEADER)
    writer.write_short(FastStreamMessageVersion.v1.value)
    headers_start = len(writer.data) + 8
    data_start = 2 + headers_start + headers_len
    writer.write_int(headers_start)
    writer.write_int(data_start)
    writer.write_short(len(msg.headers.items()))
    writer.write(headers_writer.get_bytes())
    writer.write(msg.data)
    return writer.get_bytes()

parse classmethod #

parse(data)
Source code in faststream/redis/parser.py
@classmethod
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
    headers: AnyDict
    try:
        reader = BinaryReader(data)
        headers = {}
        magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
        message_version = reader.read_short()
        if (
            magic_header == cls.IDENTITY_HEADER
            and message_version == FastStreamMessageVersion.v1.value
        ):
            headers_start = reader.read_int()
            data_start = reader.read_int()
            reader.shift_offset_to(headers_start)
            header_count = reader.read_short()
            for _ in range(header_count):
                key = reader.read_string()
                value = reader.read_string()
                headers[key] = value

            reader.shift_offset_to(data_start)
            data = reader.read_bytes()
        else:
            parsed_data = json_loads(data)
            data = parsed_data["data"].encode()
            headers = parsed_data["headers"]
    except Exception:
        # Raw Redis message format
        data = data
        headers = {}
    return data, headers

build classmethod #

build(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
@classmethod
def build(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: Optional[str],
    headers: Optional["AnyDict"],
    correlation_id: str,
) -> "MessageFormat":
    payload, content_type = encode_message(message)

    headers_to_send = {
        "correlation_id": correlation_id,
    }

    if content_type:
        headers_to_send["content-type"] = content_type

    if reply_to:
        headers_to_send["reply_to"] = reply_to

    if headers is not None:
        headers_to_send.update(headers)

    return cls(
        data=payload,
        headers=headers_to_send,
    )