Skip to content

BinaryWriter

faststream.redis.parser.BinaryWriter #

BinaryWriter()
Source code in faststream/redis/parser.py
def __init__(self) -> None:
    self.data = bytearray()

data instance-attribute #

data = bytearray()

write #

write(data)
Source code in faststream/redis/parser.py
def write(self, data: bytes) -> None:
    self.data.extend(data)

write_short #

write_short(number)
Source code in faststream/redis/parser.py
def write_short(self, number: int) -> None:
    int_bytes = pack(">H", number)
    self.write(int_bytes)

write_int #

write_int(number)
Source code in faststream/redis/parser.py
def write_int(self, number: int) -> None:
    int_bytes = pack(">I", number)
    self.write(int_bytes)

write_string #

write_string(data)
Source code in faststream/redis/parser.py
def write_string(self, data: Union[str, bytes]) -> None:
    str_len = len(data)
    self.write_short(str_len)
    if isinstance(data, bytes):
        self.write(data)
    else:
        self.write(data.encode())

get_bytes #

get_bytes()
Source code in faststream/redis/parser.py
def get_bytes(self) -> bytes:
    return bytes(self.data)