Skip to content

Exchange

faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.channel.Exchange #

Bases: BaseModel

A class to represent an exchange.

name class-attribute instance-attribute #

name: str | None = None

type instance-attribute #

type: Literal[
    "default",
    "direct",
    "topic",
    "fanout",
    "headers",
    "x-delayed-message",
    "x-consistent-hash",
    "x-modulus-hash",
]

durable class-attribute instance-attribute #

durable: bool | None = None

autoDelete class-attribute instance-attribute #

autoDelete: bool | None = None

vhost class-attribute instance-attribute #

vhost: str = '/'

from_spec classmethod #

from_spec(binding: None, vhost: str) -> None
from_spec(binding: Exchange, vhost: str) -> Self
from_spec(
    binding: Exchange | None, vhost: str
) -> Self | None
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/channel.py
@classmethod
def from_spec(cls, binding: amqp.Exchange | None, vhost: str) -> Self | None:
    if binding is None:
        return None

    return cls(
        name=binding.name,
        type=binding.type,
        durable=binding.durable,
        autoDelete=binding.auto_delete,
        vhost=vhost,
    )