Skip to content

ChannelBinding

faststream.specification.asyncapi.v3_0_0.schema.bindings.main.ChannelBinding #

Bases: BaseModel

A class to represent channel bindings.

amqp class-attribute instance-attribute #

amqp: ChannelBinding | None = None

kafka class-attribute instance-attribute #

kafka: ChannelBinding | None = None

sqs class-attribute instance-attribute #

sqs: ChannelBinding | None = None

nats class-attribute instance-attribute #

nats: ChannelBinding | None = None

redis class-attribute instance-attribute #

redis: ChannelBinding | None = None

model_config class-attribute instance-attribute #

model_config = {'extra': 'allow'}

Config #

extra class-attribute instance-attribute #

extra = 'allow'

from_sub classmethod #

from_sub(binding: ChannelBinding | None) -> Self | None
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/channel.py
@classmethod
def from_sub(cls, binding: SpecBinding | None) -> Self | None:
    if binding is None:
        return None

    if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)):
        return cls(amqp=amqp)

    if binding.kafka and (
        kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka)
    ):
        return cls(kafka=kafka)

    if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)):
        return cls(nats=nats)

    if binding.redis and (
        redis := redis_bindings.ChannelBinding.from_sub(binding.redis)
    ):
        return cls(redis=redis)

    if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)):
        return cls(sqs=sqs)

    return None

from_pub classmethod #

from_pub(binding: ChannelBinding | None) -> Self | None
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/channel.py
@classmethod
def from_pub(cls, binding: SpecBinding | None) -> Self | None:
    if binding is None:
        return None

    if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)):
        return cls(amqp=amqp)

    if binding.kafka and (
        kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka)
    ):
        return cls(kafka=kafka)

    if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)):
        return cls(nats=nats)

    if binding.redis and (
        redis := redis_bindings.ChannelBinding.from_pub(binding.redis)
    ):
        return cls(redis=redis)

    if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)):
        return cls(sqs=sqs)

    return None