Skip to content

ListSubscriberSpecification

faststream.redis.subscriber.specification.ListSubscriberSpecification #

ListSubscriberSpecification(
    _outer_config, specification_config, calls, list_sub
)

Bases: RedisSubscriberSpecification

Source code in faststream/redis/subscriber/specification.py
def __init__(
    self,
    _outer_config: "RedisBrokerConfig",
    specification_config: "RedisSubscriberSpecificationConfig",
    calls: "CallsCollection[Any]",
    list_sub: ListSub,
) -> None:
    super().__init__(_outer_config, specification_config, calls)
    self.list_sub = list_sub

list_sub instance-attribute #

list_sub = list_sub

name property #

name

list_name property #

list_name

channel_binding property #

channel_binding

calls instance-attribute #

calls = calls

config instance-attribute #

config = specification_config

include_in_schema property #

include_in_schema

description property #

description

call_name property #

call_name

get_payloads #

get_payloads()
Source code in faststream/_internal/endpoint/subscriber/specification.py
def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
    payloads: list[tuple[dict[str, Any], str]] = []

    call_name = self.call_name

    for h in self.calls:
        if h.dependant is None:
            msg = "You should setup `Handler` at first."
            raise SetupError(msg)

        body = parse_handler_params(
            h.dependant,
            prefix=f"{self.config.title_ or call_name}:Message",
        )
        payloads.append((body, to_camelcase(h.name)))

    if not self.calls:
        payloads.append(
            (
                {
                    "title": f"{self.config.title_ or call_name}:Message:Payload",
                },
                to_camelcase(call_name),
            ),
        )

    return payloads

get_schema #

get_schema()
Source code in faststream/redis/subscriber/specification.py
def get_schema(self) -> dict[str, SubscriberSpec]:
    payloads = self.get_payloads()

    return {
        self.name: SubscriberSpec(
            description=self.description,
            operation=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads),
                ),
                bindings=None,
            ),
            bindings=ChannelBinding(
                redis=self.channel_binding,
            ),
        ),
    }