Skip to content

MessageFormat

faststream.redis.parser.MessageFormat #

MessageFormat(data, headers=None)

Bases: ABC

A class to represent a raw Redis message.

Source code in faststream/redis/parser.py
def __init__(
    self,
    data: bytes,
    headers: Optional["AnyDict"] = None,
) -> None:
    self.data = data
    self.headers = headers or {}

data instance-attribute #

data = data

headers instance-attribute #

headers = headers or {}

build classmethod #

build(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
@classmethod
def build(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: Optional[str],
    headers: Optional["AnyDict"],
    correlation_id: str,
) -> "MessageFormat":
    payload, content_type = encode_message(message)

    headers_to_send = {
        "correlation_id": correlation_id,
    }

    if content_type:
        headers_to_send["content-type"] = content_type

    if reply_to:
        headers_to_send["reply_to"] = reply_to

    if headers is not None:
        headers_to_send.update(headers)

    return cls(
        data=payload,
        headers=headers_to_send,
    )

encode abstractmethod classmethod #

encode(*, message, reply_to, headers, correlation_id)
Source code in faststream/redis/parser.py
@classmethod
@abstractmethod
def encode(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: Optional[str],
    headers: Optional["AnyDict"],
    correlation_id: str,
) -> bytes:
    raise NotImplementedError()

parse abstractmethod classmethod #

parse(data)
Source code in faststream/redis/parser.py
@classmethod
@abstractmethod
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
    raise NotImplementedError()