Skip to content

OperationBinding

faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.operation.OperationBinding #

Bases: BaseModel

A class to represent an operation binding.

cc class-attribute instance-attribute #

cc: str | None = None

ack instance-attribute #

ack: bool

replyTo class-attribute instance-attribute #

replyTo: str | None = None

deliveryMode class-attribute instance-attribute #

deliveryMode: int | None = None

mandatory class-attribute instance-attribute #

mandatory: bool | None = None

priority class-attribute instance-attribute #

priority: PositiveInt | None = None

bindingVersion class-attribute instance-attribute #

bindingVersion: str = '0.2.0'

from_sub classmethod #

from_sub(binding: OperationBinding | None) -> Self | None
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
@classmethod
def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )

from_pub classmethod #

from_pub(binding: OperationBinding | None) -> Self | None
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
@classmethod
def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )