Skip to content

encode_message

faststream.message.utils.encode_message #

encode_message(msg, serializer)

Encodes a message.

Source code in faststream/message/utils.py
def encode_message(
    msg: Union[Sequence["SendableMessage"], "SendableMessage"],
    serializer: Optional["SerializerProto"],
) -> tuple[bytes, str | None]:
    """Encodes a message."""
    if msg is None:
        return (
            b"",
            None,
        )

    if isinstance(msg, bytes):
        return (
            msg,
            None,
        )

    if isinstance(msg, str):
        return (
            msg.encode(),
            ContentTypes.TEXT.value,
        )

    if serializer is not None:
        return (
            serializer.encode(msg),
            ContentTypes.JSON.value,
        )

    return (
        json_dumps(msg),
        ContentTypes.JSON.value,
    )