Skip to content

ChannelBinding

faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.channel.ChannelBinding #

Bases: BaseModel

A class to represent channel binding.

ATTRIBUTE DESCRIPTION
is_

Type of binding, can be "queue" or "routingKey"

bindingVersion

Version of the binding

queue

Optional queue object

exchange

Optional exchange object

is_ class-attribute instance-attribute #

is_ = Field(..., alias='is')

bindingVersion class-attribute instance-attribute #

bindingVersion = '0.2.0'

queue class-attribute instance-attribute #

queue = None

exchange class-attribute instance-attribute #

exchange = None

from_sub classmethod #

from_sub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/channel.py
@classmethod
def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
    if binding is None:
        return None

    return cls(
        **{
            "is": "routingKey",
            "queue": Queue.from_spec(binding.queue, binding.virtual_host)
            if binding.exchange.is_respect_routing_key
            else None,
            "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
        },
    )

from_pub classmethod #

from_pub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/channel.py
@classmethod
def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
    if binding is None:
        return None

    return cls(
        **{
            "is": "routingKey",
            "queue": Queue.from_spec(binding.queue, binding.virtual_host)
            if binding.exchange.is_respect_routing_key and binding.queue.name
            else None,
            "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
        },
    )