Bases: BaseModel
A class to represent a channel.
Configurations
model_config : configuration for the model (only applicable for Pydantic version 2) Config : configuration for the class (only applicable for Pydantic version 1)
address instance-attribute
description class-attribute instance-attribute
description: str | None = None
servers class-attribute instance-attribute
servers: list[dict[str, str]] | None = None
messages instance-attribute
bindings class-attribute instance-attribute
model_config class-attribute instance-attribute
model_config = {'extra': 'allow'}
from_sub classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/channels.py
| @classmethod
def from_sub(cls, address: str, subscriber: SubscriberSpec) -> Self:
message = subscriber.operation.message
assert message.title
*left, right = message.title.split(":")
message.title = ":".join((*left, f"Subscribe{right}"))
return cls(
description=subscriber.description,
address=address,
messages={
"SubscribeMessage": Message.from_spec(message),
},
bindings=ChannelBinding.from_sub(subscriber.bindings),
servers=None,
)
|
from_pub classmethod
Source code in faststream/specification/asyncapi/v3_0_0/schema/channels.py
| @classmethod
def from_pub(cls, address: str, publisher: PublisherSpec) -> Self:
return cls(
description=publisher.description,
address=address,
messages={
"Message": Message.from_spec(publisher.operation.message),
},
bindings=ChannelBinding.from_pub(publisher.bindings),
servers=None,
)
|