Skip to content

move_pydantic_refs

faststream.specification.asyncapi.utils.move_pydantic_refs #

move_pydantic_refs(original, key)

Remove pydantic references and replacem them by real schemas.

Source code in faststream/specification/asyncapi/utils.py
def move_pydantic_refs(
    original: Any,
    key: str,
) -> Any:
    """Remove pydantic references and replacem them by real schemas."""
    if not isinstance(original, dict):
        return original

    data = original.copy()

    for k in data:
        item = data[k]

        if isinstance(item, str):
            if key in item:
                data[k] = data[k].replace(key, "components/schemas")

        elif isinstance(item, dict):
            data[k] = move_pydantic_refs(data[k], key)

        elif isinstance(item, list):
            for i in range(len(data[k])):
                data[k][i] = move_pydantic_refs(item[i], key)

    if (
        isinstance(desciminator := data.get("discriminator"), dict)
        and "propertyName" in desciminator
    ):
        data["discriminator"] = desciminator["propertyName"]

    return data