StreamSub(
stream: str,
polling_interval: int | None = None,
group: str | None = None,
consumer: str | None = None,
batch: bool = False,
no_ack: bool = False,
last_id: str | None = None,
maxlen: int | None = None,
max_records: int | None = None,
min_idle_time: int | None = None,
)
Bases: NameRequired
A class to represent a Redis Stream subscriber.
| PARAMETER | DESCRIPTION |
batch | Whether to consume messages in batches or not. TYPE: bool DEFAULT: False |
max_records | Number of messages to consume as one batch. TYPE: int | None DEFAULT: None |
consumer | The consumer unique name https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/#run-the-consumer TYPE: str | None DEFAULT: None |
group | The name of consumer group TYPE: str | None DEFAULT: None |
last_id | An Entry ID, which uses to pick up from where it left off after it is restarted. TYPE: str | None DEFAULT: None |
maxlen | Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded. https://redis.io/docs/latest/develop/data-types/streams/#capped-streams TYPE: int | None DEFAULT: None |
name | The original Redis Stream name. |
no_ack | If True, to enable the XREADGROUP NOACK subcommand. https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup TYPE: bool DEFAULT: False |
polling_interval | Polling interval in seconds. TYPE: int | None DEFAULT: None |
min_idle_time | Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM. Messages that have been pending (unacknowledged) for at least this duration can be reclaimed by this consumer. Only applicable when using consumer groups. https://redis.io/docs/latest/commands/xautoclaim/ TYPE: int | None DEFAULT: None |
Source code in faststream/redis/schemas/stream_sub.py
| def __init__(
self,
stream: str,
polling_interval: int | None = None,
group: str | None = None,
consumer: str | None = None,
batch: bool = False,
no_ack: bool = False,
last_id: str | None = None,
maxlen: int | None = None,
max_records: int | None = None,
min_idle_time: int | None = None,
) -> None:
if (group and not consumer) or (not group and consumer):
msg = "You should specify `group` and `consumer` both"
raise SetupError(msg)
if last_id is None:
last_id = ">" if group and consumer else "$"
if group and consumer:
if last_id != ">":
if polling_interval:
warnings.warn(
message="`polling_interval` is not supported by consumer group with last_id other than `>`",
category=RuntimeWarning,
stacklevel=1,
)
if no_ack:
warnings.warn(
message="`no_ack` is not supported by consumer group with last_id other than `>`",
category=RuntimeWarning,
stacklevel=1,
)
elif no_ack:
warnings.warn(
message="`no_ack` has no effect with consumer group",
category=RuntimeWarning,
stacklevel=1,
)
super().__init__(stream)
self.group = group
self.consumer = consumer
self.polling_interval = polling_interval or 100
self.batch = batch
self.no_ack = no_ack
self.last_id = last_id
self.maxlen = maxlen
self.max_records = max_records
self.min_idle_time = min_idle_time
|
consumer instance-attribute
polling_interval instance-attribute
polling_interval = polling_interval or 100
no_ack instance-attribute
last_id instance-attribute
maxlen instance-attribute
max_records instance-attribute
max_records = max_records
min_idle_time instance-attribute
min_idle_time = min_idle_time
add_prefix
Source code in faststream/redis/schemas/stream_sub.py
| def add_prefix(self, prefix: str) -> "StreamSub":
new_stream = deepcopy(self)
new_stream.name = f"{prefix}{new_stream.name}"
return new_stream
|
validate classmethod
validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(
value: str | Self | None, **kwargs: Any
) -> Self | None
Factory to create object.
Source code in faststream/_internal/proto.py
| @classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|