Skip to content

OperationBinding

faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.operation.OperationBinding #

Bases: BaseModel

A class to represent an operation binding.

ATTRIBUTE DESCRIPTION
cc

optional string representing the cc

ack

boolean indicating if the operation is acknowledged

replyTo

optional dictionary representing the replyTo

bindingVersion

string representing the binding version

cc class-attribute instance-attribute #

cc = None

ack instance-attribute #

ack

replyTo class-attribute instance-attribute #

replyTo = None

deliveryMode class-attribute instance-attribute #

deliveryMode = None

mandatory class-attribute instance-attribute #

mandatory = None

priority class-attribute instance-attribute #

priority = None

bindingVersion class-attribute instance-attribute #

bindingVersion = '0.2.0'

from_sub classmethod #

from_sub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
@classmethod
def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )

from_pub classmethod #

from_pub(binding)
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
@classmethod
def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )