Bases: BaseModel
A class to represent an operation binding.
amqp class-attribute instance-attribute
kafka class-attribute instance-attribute
sqs class-attribute instance-attribute
nats class-attribute instance-attribute
redis class-attribute instance-attribute
http class-attribute instance-attribute
model_config class-attribute instance-attribute
model_config = {'extra': 'allow'}
Config
extra class-attribute instance-attribute
from_sub classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/operation.py
| @classmethod
def from_sub(cls, binding: SpecBinding | None) -> Self | None:
if binding is None:
return None
if binding.amqp and (
amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp)
):
return cls(amqp=amqp)
if binding.kafka and (
kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka)
):
return cls(kafka=kafka)
if binding.nats and (
nats := nats_bindings.OperationBinding.from_sub(binding.nats)
):
return cls(nats=nats)
if binding.redis and (
redis := redis_bindings.OperationBinding.from_sub(binding.redis)
):
return cls(redis=redis)
if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)):
return cls(sqs=sqs)
return None
|
from_pub classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/main/operation.py
| @classmethod
def from_pub(cls, binding: SpecBinding | None) -> Self | None:
if binding is None:
return None
if binding.amqp and (
amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp)
):
return cls(amqp=amqp)
if binding.kafka and (
kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka)
):
return cls(kafka=kafka)
if binding.nats and (
nats := nats_bindings.OperationBinding.from_pub(binding.nats)
):
return cls(nats=nats)
if binding.redis and (
redis := redis_bindings.OperationBinding.from_pub(binding.redis)
):
return cls(redis=redis)
if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)):
return cls(sqs=sqs)
return None
|