Skip to content

resolve_payloads

faststream.specification.asyncapi.utils.resolve_payloads #

resolve_payloads(payloads, extra='', served_words=1)
Source code in faststream/specification/asyncapi/utils.py
def resolve_payloads(
    payloads: list[tuple["dict[str, Any]", str]],
    extra: str = "",
    served_words: int = 1,
) -> "dict[str, Any]":
    ln = len(payloads)
    payload: dict[str, Any]
    if ln > 1:
        one_of_payloads = {}

        for body, handler_name in payloads:
            title = body["title"]
            words = title.split(":")

            if len(words) > 1:  # not pydantic model case
                body["title"] = title = ":".join(
                    filter(
                        bool,
                        (
                            handler_name,
                            extra if extra not in words else "",
                            *words[served_words:],
                        ),
                    ),
                )

            one_of_payloads[title] = body

        payload = {"oneOf": one_of_payloads}

    elif ln == 1:
        payload = payloads[0][0]

    else:
        payload = {}

    return payload