Skip to content

ListSub

faststream.redis.schemas.ListSub #

ListSub(
    list_name,
    batch=False,
    max_records=10,
    polling_interval=0.1,
)

Bases: NameRequired

A class to represent a Redis List subscriber.

Source code in faststream/redis/schemas/list_sub.py
def __init__(
    self,
    list_name: str,
    batch: bool = False,
    max_records: int = 10,
    polling_interval: float = 0.1,
) -> None:
    super().__init__(list_name)

    self.batch = batch
    self.max_records = max_records
    self.polling_interval = polling_interval

name instance-attribute #

name = name

batch instance-attribute #

batch = batch

max_records instance-attribute #

max_records = max_records

polling_interval instance-attribute #

polling_interval = polling_interval

records cached property #

records

validate classmethod #

validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/_internal/proto.py
@classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix)
Source code in faststream/redis/schemas/list_sub.py
def add_prefix(self, prefix: str) -> "ListSub":
    new_list = deepcopy(self)
    new_list.name = f"{prefix}{new_list.name}"
    return new_list