Skip to content

OperationBinding

faststream.specification.asyncapi.v2_6_0.schema.bindings.OperationBinding #

Bases: BaseModel

A class to represent an operation binding.

ATTRIBUTE DESCRIPTION
amqp

AMQP operation binding (optional)

kafka

Kafka operation binding (optional)

sqs

SQS operation binding (optional)

nats

NATS operation binding (optional)

redis

Redis operation binding (optional)

http

HTTP channel binding (optional)

amqp class-attribute instance-attribute #

amqp = None

kafka class-attribute instance-attribute #

kafka = None

sqs class-attribute instance-attribute #

sqs = None

nats class-attribute instance-attribute #

nats = None

redis class-attribute instance-attribute #

redis = None

http class-attribute instance-attribute #

http = None

model_config class-attribute instance-attribute #

model_config = {'extra': 'allow'}

Config #

extra class-attribute instance-attribute #

extra = 'allow'

from_sub classmethod #

from_sub(binding: None) -> None
from_sub(binding: OperationBinding) -> Self
from_sub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/main/operation.py
@classmethod
def from_sub(cls, binding: SpecBinding | None) -> Self | None:
    if binding is None:
        return None

    if binding.amqp and (
        amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp)
    ):
        return cls(amqp=amqp)

    if binding.kafka and (
        kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka)
    ):
        return cls(kafka=kafka)

    if binding.nats and (
        nats := nats_bindings.OperationBinding.from_sub(binding.nats)
    ):
        return cls(nats=nats)

    if binding.redis and (
        redis := redis_bindings.OperationBinding.from_sub(binding.redis)
    ):
        return cls(redis=redis)

    if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)):
        return cls(sqs=sqs)

    return None

from_pub classmethod #

from_pub(binding: None) -> None
from_pub(binding: OperationBinding) -> Self
from_pub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/main/operation.py
@classmethod
def from_pub(cls, binding: SpecBinding | None) -> Self | None:
    if binding is None:
        return None

    if binding.amqp and (
        amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp)
    ):
        return cls(amqp=amqp)

    if binding.kafka and (
        kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka)
    ):
        return cls(kafka=kafka)

    if binding.nats and (
        nats := nats_bindings.OperationBinding.from_pub(binding.nats)
    ):
        return cls(nats=nats)

    if binding.redis and (
        redis := redis_bindings.OperationBinding.from_pub(binding.redis)
    ):
        return cls(redis=redis)

    if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)):
        return cls(sqs=sqs)

    return None