Skip to content

OperationBinding

faststream.specification.asyncapi.v3_0_0.schema.bindings.amqp.operation.OperationBinding #

Bases: BaseModel

cc class-attribute instance-attribute #

cc = None

ack instance-attribute #

ack

replyTo class-attribute instance-attribute #

replyTo = None

deliveryMode class-attribute instance-attribute #

deliveryMode = None

mandatory class-attribute instance-attribute #

mandatory = None

priority class-attribute instance-attribute #

priority = None

bindingVersion class-attribute instance-attribute #

bindingVersion = '0.3.0'

from_sub classmethod #

from_sub(binding)
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/operation.py
@classmethod
def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=[binding.routing_key]
        if (binding.routing_key and binding.exchange.is_respect_routing_key)
        else None,
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )

from_pub classmethod #

from_pub(binding)
Source code in faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/operation.py
@classmethod
def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None:
    if not binding:
        return None

    return cls(
        cc=None
        if (not binding.routing_key or not binding.exchange.is_respect_routing_key)
        else [binding.routing_key],
        ack=binding.ack,
        replyTo=binding.reply_to,
        deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
        mandatory=binding.mandatory,
        priority=binding.priority,
    )