Skip to content

Channel

faststream.specification.asyncapi.v3_0_0.schema.channels.Channel #

Bases: BaseModel

A class to represent a channel.

ATTRIBUTE DESCRIPTION
address

A string representation of this channel's address.

TYPE: str

description

optional description of the channel

servers

optional list of servers associated with the channel

bindings

optional channel binding

parameters

optional parameters associated with the channel

Configurations

model_config : configuration for the model (only applicable for Pydantic version 2) Config : configuration for the class (only applicable for Pydantic version 1)

address instance-attribute #

address

description class-attribute instance-attribute #

description = None

servers class-attribute instance-attribute #

servers = None

messages instance-attribute #

messages

bindings class-attribute instance-attribute #

bindings = None

model_config class-attribute instance-attribute #

model_config = {'extra': 'allow'}

Config #

extra class-attribute instance-attribute #

extra = 'allow'

from_sub classmethod #

from_sub(address, subscriber)
Source code in faststream/specification/asyncapi/v3_0_0/schema/channels.py
@classmethod
def from_sub(cls, address: str, subscriber: SubscriberSpec) -> Self:
    message = subscriber.operation.message
    assert message.title

    *left, right = message.title.split(":")
    message.title = ":".join((*left, f"Subscribe{right}"))

    return cls(
        description=subscriber.description,
        address=address,
        messages={
            "SubscribeMessage": Message.from_spec(message),
        },
        bindings=ChannelBinding.from_sub(subscriber.bindings),
        servers=None,
    )

from_pub classmethod #

from_pub(address, publisher)
Source code in faststream/specification/asyncapi/v3_0_0/schema/channels.py
@classmethod
def from_pub(cls, address: str, publisher: PublisherSpec) -> Self:
    return cls(
        description=publisher.description,
        address=address,
        messages={
            "Message": Message.from_spec(publisher.operation.message),
        },
        bindings=ChannelBinding.from_pub(publisher.bindings),
        servers=None,
    )