Skip to content

BinaryMessageFormatV1

faststream.redis.parser.binary.BinaryMessageFormatV1 #

BinaryMessageFormatV1(data, headers=None)

Bases: MessageFormat

Message format to encode into binary and parse it.

Source code in faststream/redis/parser/message.py
def __init__(
    self,
    data: bytes,
    headers: dict[str, Any] | None = None,
) -> None:
    self.data = data
    self.headers = headers or {}

IDENTITY_HEADER class-attribute instance-attribute #

IDENTITY_HEADER = b'\x89BIN\r\n\x1a\n'

data instance-attribute #

data = data

headers instance-attribute #

headers = headers or {}

encode classmethod #

encode(
    *,
    message,
    reply_to,
    headers,
    correlation_id,
    serializer=None,
)
Source code in faststream/redis/parser/binary.py
@classmethod
def encode(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: str | None,
    headers: dict[str, Any] | None,
    correlation_id: str,
    serializer: Optional["SerializerProto"] = None,
) -> bytes:
    msg = cls.build(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
        serializer=serializer,
    )
    headers_writer = BinaryWriter()
    for key, value in msg.headers.items():
        headers_writer.write_string(key)
        headers_writer.write_string(value)

    headers_len = len(headers_writer.data)
    writer = BinaryWriter()
    writer.write(cls.IDENTITY_HEADER)
    writer.write_short(FastStreamMessageVersion.v1.value)
    headers_start = len(writer.data) + 8
    data_start = 2 + headers_start + headers_len
    writer.write_int(headers_start)
    writer.write_int(data_start)
    writer.write_short(len(msg.headers.items()))
    writer.write(headers_writer.get_bytes())
    writer.write(msg.data)
    return writer.get_bytes()

parse classmethod #

parse(data)
Source code in faststream/redis/parser/binary.py
@classmethod
def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]:
    headers: dict[str, Any] = {}
    final_data: bytes

    try:
        reader = BinaryReader(data)
        magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
        message_version = reader.read_short()

        if (
            magic_header == cls.IDENTITY_HEADER
            and message_version == FastStreamMessageVersion.v1.value
        ):
            headers_start = reader.read_int()
            data_start = reader.read_int()
            reader.shift_offset_to(headers_start)
            header_count = reader.read_short()
            for _ in range(header_count):
                key = reader.read_string()
                value = reader.read_string()
                headers[key] = value

            reader.shift_offset_to(data_start)
            final_data = reader.read_bytes()

        else:
            parsed_data = json_loads(data)
            final_data = parsed_data["data"].encode()
            headers = parsed_data["headers"]

    except Exception:
        # Raw Redis message format
        # TODO: fallback to raw data after JSONMessageFormat removed
        return JSONMessageFormat.parse(data)

    return final_data, headers

build classmethod #

build(
    *,
    message,
    reply_to,
    headers,
    correlation_id,
    serializer=None,
)
Source code in faststream/redis/parser/message.py
@classmethod
def build(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: str | None,
    headers: dict[str, Any] | None,
    correlation_id: str,
    serializer: Optional["SerializerProto"] = None,
) -> "MessageFormat":
    payload, content_type = encode_message(message, serializer=serializer)

    headers_to_send = {
        "correlation_id": correlation_id,
    }

    if content_type:
        headers_to_send["content-type"] = content_type

    if reply_to:
        headers_to_send["reply_to"] = reply_to

    if headers is not None:
        headers_to_send.update(headers)

    return cls(
        data=payload,
        headers=headers_to_send,
    )