Skip to content

StreamSub

faststream.redis.schemas.StreamSub #

StreamSub(
    stream,
    polling_interval=None,
    group=None,
    consumer=None,
    batch=False,
    no_ack=False,
    last_id=None,
    maxlen=None,
    max_records=None,
)

Bases: NameRequired

A class to represent a Redis Stream subscriber.

PARAMETER DESCRIPTION
batch

Whether to consume messages in batches or not.

TYPE: bool DEFAULT: False

max_records

Number of messages to consume as one batch.

TYPE: int | None DEFAULT: None

consumer

The consumer unique name

https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/#run-the-consumer

TYPE: str | None DEFAULT: None

group

The name of consumer group

TYPE: str | None DEFAULT: None

last_id

An Entry ID, which uses to pick up from where it left off after it is restarted.

TYPE: str | None DEFAULT: None

maxlen

Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.

https://redis.io/docs/latest/develop/data-types/streams/#capped-streams

TYPE: int | None DEFAULT: None

name

The original Redis Stream name.

no_ack

If True, to enable the XREADGROUP NOACK subcommand.

https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup

TYPE: bool DEFAULT: False

polling_interval

Polling interval in seconds.

TYPE: int | None DEFAULT: None

Source code in faststream/redis/schemas/stream_sub.py
def __init__(
    self,
    stream: str,
    polling_interval: int | None = None,
    group: str | None = None,
    consumer: str | None = None,
    batch: bool = False,
    no_ack: bool = False,
    last_id: str | None = None,
    maxlen: int | None = None,
    max_records: int | None = None,
) -> None:
    if (group and not consumer) or (not group and consumer):
        msg = "You should specify `group` and `consumer` both"
        raise SetupError(msg)

    if group and consumer:
        if last_id != ">":
            if polling_interval:
                warnings.warn(
                    message="`polling_interval` is not supported by consumer group with last_id other than `>`",
                    category=RuntimeWarning,
                    stacklevel=1,
                )

            if no_ack:
                warnings.warn(
                    message="`no_ack` is not supported by consumer group with last_id other than `>`",
                    category=RuntimeWarning,
                    stacklevel=1,
                )

        elif no_ack:
            warnings.warn(
                message="`no_ack` has no effect with consumer group",
                category=RuntimeWarning,
                stacklevel=1,
            )

    if last_id is None:
        last_id = ">" if group and consumer else "$"

    super().__init__(stream)

    self.group = group
    self.consumer = consumer
    self.polling_interval = polling_interval or 100
    self.batch = batch
    self.no_ack = no_ack
    self.last_id = last_id
    self.maxlen = maxlen
    self.max_records = max_records

name instance-attribute #

name = name

group instance-attribute #

group = group

consumer instance-attribute #

consumer = consumer

polling_interval instance-attribute #

polling_interval = polling_interval or 100

batch instance-attribute #

batch = batch

no_ack instance-attribute #

no_ack = no_ack

last_id instance-attribute #

last_id = last_id

maxlen instance-attribute #

maxlen = maxlen

max_records instance-attribute #

max_records = max_records

validate classmethod #

validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/_internal/proto.py
@classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix)
Source code in faststream/redis/schemas/stream_sub.py
def add_prefix(self, prefix: str) -> "StreamSub":
    new_stream = deepcopy(self)
    new_stream.name = f"{prefix}{new_stream.name}"
    return new_stream