Skip to content

decode_message

faststream.message.decode_message #

decode_message(message)

Decodes a message.

Source code in faststream/message/utils.py
def decode_message(message: "StreamMessage[Any]") -> "DecodedMessage":
    """Decodes a message."""
    body: Any = getattr(message, "body", message)
    m: DecodedMessage = body

    if content_type := getattr(message, "content_type", False):
        content_type = ContentTypes(cast("str", content_type))

        if content_type is ContentTypes.TEXT:
            m = body.decode()

        elif content_type is ContentTypes.JSON:
            m = json_loads(body)

    else:
        # content-type not set
        with suppress(json.JSONDecodeError, UnicodeDecodeError):
            m = json_loads(body)

    return m