Skip to content

StreamSub

faststream.redis.StreamSub #

StreamSub(
    stream: str,
    polling_interval: int | None = None,
    group: str | None = None,
    consumer: str | None = None,
    batch: bool = False,
    no_ack: bool = False,
    last_id: str | None = None,
    maxlen: int | None = None,
    max_records: int | None = None,
    min_idle_time: int | None = None,
)

Bases: NameRequired

A class to represent a Redis Stream subscriber.

PARAMETER DESCRIPTION
batch

Whether to consume messages in batches or not.

TYPE: bool DEFAULT: False

max_records

Number of messages to consume as one batch.

TYPE: int | None DEFAULT: None

consumer

The consumer unique name

https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/#run-the-consumer

TYPE: str | None DEFAULT: None

group

The name of consumer group

TYPE: str | None DEFAULT: None

last_id

An Entry ID, which uses to pick up from where it left off after it is restarted.

TYPE: str | None DEFAULT: None

maxlen

Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.

https://redis.io/docs/latest/develop/data-types/streams/#capped-streams

TYPE: int | None DEFAULT: None

name

The original Redis Stream name.

no_ack

If True, to enable the XREADGROUP NOACK subcommand.

https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup

TYPE: bool DEFAULT: False

polling_interval

Polling interval in seconds.

TYPE: int | None DEFAULT: None

min_idle_time

Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM. Messages that have been pending (unacknowledged) for at least this duration can be reclaimed by this consumer. Only applicable when using consumer groups.

https://redis.io/docs/latest/commands/xautoclaim/

TYPE: int | None DEFAULT: None

Source code in faststream/redis/schemas/stream_sub.py
def __init__(
    self,
    stream: str,
    polling_interval: int | None = None,
    group: str | None = None,
    consumer: str | None = None,
    batch: bool = False,
    no_ack: bool = False,
    last_id: str | None = None,
    maxlen: int | None = None,
    max_records: int | None = None,
    min_idle_time: int | None = None,
) -> None:
    if (group and not consumer) or (not group and consumer):
        msg = "You should specify `group` and `consumer` both"
        raise SetupError(msg)

    if last_id is None:
        last_id = ">" if group and consumer else "$"

    if group and consumer:
        if last_id != ">":
            if polling_interval:
                warnings.warn(
                    message="`polling_interval` is not supported by consumer group with last_id other than `>`",
                    category=RuntimeWarning,
                    stacklevel=1,
                )

            if no_ack:
                warnings.warn(
                    message="`no_ack` is not supported by consumer group with last_id other than `>`",
                    category=RuntimeWarning,
                    stacklevel=1,
                )

        elif no_ack:
            warnings.warn(
                message="`no_ack` has no effect with consumer group",
                category=RuntimeWarning,
                stacklevel=1,
            )

    super().__init__(stream)

    self.group = group
    self.consumer = consumer
    self.polling_interval = polling_interval or 100
    self.batch = batch
    self.no_ack = no_ack
    self.last_id = last_id
    self.maxlen = maxlen
    self.max_records = max_records
    self.min_idle_time = min_idle_time

name instance-attribute #

name = name

group instance-attribute #

group = group

consumer instance-attribute #

consumer = consumer

polling_interval instance-attribute #

polling_interval = polling_interval or 100

batch instance-attribute #

batch = batch

no_ack instance-attribute #

no_ack = no_ack

last_id instance-attribute #

last_id = last_id

maxlen instance-attribute #

maxlen = maxlen

max_records instance-attribute #

max_records = max_records

min_idle_time instance-attribute #

min_idle_time = min_idle_time

validate classmethod #

validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(
    value: str | Self | None, **kwargs: Any
) -> Self | None

Factory to create object.

Source code in faststream/_internal/proto.py
@classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix: str) -> StreamSub
Source code in faststream/redis/schemas/stream_sub.py
def add_prefix(self, prefix: str) -> "StreamSub":
    new_stream = deepcopy(self)
    new_stream.name = f"{prefix}{new_stream.name}"
    return new_stream