Skip to content

MessageFormat

faststream.redis.parser.MessageFormat #

MessageFormat(data, headers=None)

Bases: ABC

A class to represent a raw Redis message.

Source code in faststream/redis/parser/message.py
def __init__(
    self,
    data: bytes,
    headers: dict[str, Any] | None = None,
) -> None:
    self.data = data
    self.headers = headers or {}

data instance-attribute #

data = data

headers instance-attribute #

headers = headers or {}

build classmethod #

build(
    *,
    message,
    reply_to,
    headers,
    correlation_id,
    serializer=None,
)
Source code in faststream/redis/parser/message.py
@classmethod
def build(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: str | None,
    headers: dict[str, Any] | None,
    correlation_id: str,
    serializer: Optional["SerializerProto"] = None,
) -> "MessageFormat":
    payload, content_type = encode_message(message, serializer=serializer)

    headers_to_send = {
        "correlation_id": correlation_id,
    }

    if content_type:
        headers_to_send["content-type"] = content_type

    if reply_to:
        headers_to_send["reply_to"] = reply_to

    if headers is not None:
        headers_to_send.update(headers)

    return cls(
        data=payload,
        headers=headers_to_send,
    )

encode abstractmethod classmethod #

encode(
    *,
    message,
    reply_to,
    headers,
    correlation_id,
    serializer=None,
)
Source code in faststream/redis/parser/message.py
@classmethod
@abstractmethod
def encode(
    cls,
    *,
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    reply_to: str | None,
    headers: dict[str, Any] | None,
    correlation_id: str,
    serializer: Optional["SerializerProto"] = None,
) -> bytes:
    raise NotImplementedError

parse abstractmethod classmethod #

parse(data)
Source code in faststream/redis/parser/message.py
@classmethod
@abstractmethod
def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]:
    raise NotImplementedError