Skip to content

BinaryReader

faststream.redis.parser.BinaryReader #

BinaryReader(data)
Source code in faststream/redis/parser.py
def __init__(self, data: bytes) -> None:
    self.data = data
    self.offset = 0

data instance-attribute #

data = data

offset instance-attribute #

offset = 0

read_until #

read_until(offset)
Source code in faststream/redis/parser.py
def read_until(self, offset: int) -> bytes:
    data = self.data[self.offset : self.offset + offset]
    self.offset += offset
    return data

shift_offset_to #

shift_offset_to(offset)
Source code in faststream/redis/parser.py
def shift_offset_to(self, offset: int) -> None:
    self.offset = offset

read_short #

read_short()
Source code in faststream/redis/parser.py
def read_short(self) -> int:
    data = unpack(">H", self.data[self.offset : self.offset + 2])[0]
    self.offset += 2
    return int(data)

read_int #

read_int()
Source code in faststream/redis/parser.py
def read_int(self) -> int:
    data = unpack(">I", self.data[self.offset : self.offset + 4])[0]
    self.offset += 4
    return int(data)

read_string #

read_string()
Source code in faststream/redis/parser.py
def read_string(self) -> str:
    str_len = self.read_short()
    data = self.data[self.offset : self.offset + str_len]
    self.offset += str_len
    return data.decode()

read_bytes #

read_bytes()
Source code in faststream/redis/parser.py
def read_bytes(self) -> bytes:
    return self.data[self.offset :]