Bases: BaseModel
A class to represent channel bindings.
ATTRIBUTE |
DESCRIPTION |
amqp |
AMQP channel binding (optional)
|
kafka |
Kafka channel binding (optional)
|
sqs |
SQS channel binding (optional)
|
nats |
NATS channel binding (optional)
|
redis |
Redis channel binding (optional)
|
amqp
class-attribute
instance-attribute
kafka
class-attribute
instance-attribute
sqs
class-attribute
instance-attribute
nats
class-attribute
instance-attribute
redis
class-attribute
instance-attribute
model_config
class-attribute
instance-attribute
model_config = {'extra': 'allow'}
Config
extra
class-attribute
instance-attribute
from_sub
classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/channel.py
| @classmethod
def from_sub(cls, binding: SpecBinding | None) -> Self | None:
if binding is None:
return None
if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)):
return cls(amqp=amqp)
if binding.kafka and (
kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka)
):
return cls(kafka=kafka)
if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)):
return cls(nats=nats)
if binding.redis and (
redis := redis_bindings.ChannelBinding.from_sub(binding.redis)
):
return cls(redis=redis)
if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)):
return cls(sqs=sqs)
return None
|
from_pub
classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/channel.py
| @classmethod
def from_pub(cls, binding: SpecBinding | None) -> Self | None:
if binding is None:
return None
if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)):
return cls(amqp=amqp)
if binding.kafka and (
kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka)
):
return cls(kafka=kafka)
if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)):
return cls(nats=nats)
if binding.redis and (
redis := redis_bindings.ChannelBinding.from_pub(binding.redis)
):
return cls(redis=redis)
if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)):
return cls(sqs=sqs)
return None
|