Skip to content

build_message

faststream.redis.testing.build_message #

build_message(
    message,
    *,
    correlation_id,
    message_format,
    reply_to="",
    headers=None,
)
Source code in faststream/redis/testing.py
def build_message(
    message: Union[Sequence["SendableMessage"], "SendableMessage"],
    *,
    correlation_id: str,
    message_format: Type["MessageFormat"],
    reply_to: str = "",
    headers: Optional["AnyDict"] = None,
) -> bytes:
    data = message_format.encode(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )
    return data