Bases: BaseModel
A class to represent an operation binding.
cc class-attribute instance-attribute
replyTo class-attribute instance-attribute
replyTo: str | None = None
deliveryMode class-attribute instance-attribute
deliveryMode: int | None = None
mandatory class-attribute instance-attribute
mandatory: bool | None = None
priority class-attribute instance-attribute
priority: PositiveInt | None = None
bindingVersion class-attribute instance-attribute
bindingVersion: str = '0.2.0'
from_sub classmethod
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
| @classmethod
def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None:
if not binding:
return None
return cls(
cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
ack=binding.ack,
replyTo=binding.reply_to,
deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
mandatory=binding.mandatory,
priority=binding.priority,
)
|
from_pub classmethod
Source code in faststream/specification/asyncapi/v2_6_0/schema/bindings/amqp/operation.py
| @classmethod
def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None:
if not binding:
return None
return cls(
cc=binding.routing_key if binding.exchange.is_respect_routing_key else None,
ack=binding.ack,
replyTo=binding.reply_to,
deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
mandatory=binding.mandatory,
priority=binding.priority,
)
|