Skip to content

ChannelBinding

faststream.specification.asyncapi.v3_0_0.schema.bindings.amqp.ChannelBinding #

Bases: ChannelBinding

is_ class-attribute instance-attribute #

is_ = Field(..., alias='is')

queue class-attribute instance-attribute #

queue = None

exchange class-attribute instance-attribute #

exchange = None

bindingVersion class-attribute instance-attribute #

bindingVersion = '0.3.0'

from_sub classmethod #

from_sub(binding)
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py
@classmethod
def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
    if binding is None:
        return None

    return cls(
        **{
            "is": "queue",
            "queue": Queue.from_spec(binding.queue, binding.virtual_host),
        },
    )

from_pub classmethod #

from_pub(binding)
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py
@classmethod
def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
    if binding is None:
        return None

    return cls(
        **{
            "is": "routingKey",
            "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
        },
    )