Skip to content

StreamSubscriber

faststream.redis.subscriber.usecases.stream_subscriber.StreamSubscriber #

StreamSubscriber(
    config: RedisSubscriberConfig,
    specification: SubscriberSpecification[Any, Any],
    calls: CallsCollection[Any],
)

Bases: _StreamHandlerMixin

Source code in faststream/redis/subscriber/usecases/stream_subscriber.py
def __init__(
    self,
    config: "RedisSubscriberConfig",
    specification: "SubscriberSpecification[Any, Any]",
    calls: "CallsCollection[Any]",
) -> None:
    parser = RedisStreamParser(config)
    config.decoder = parser.decode_message
    config.parser = parser.parse_message
    super().__init__(config, specification, calls)

lock instance-attribute #

lock: AbstractContextManager[Any] = FakeContext()

extra_watcher_options instance-attribute #

extra_watcher_options: dict[str, Any] = {}

graceful_timeout instance-attribute #

graceful_timeout: float | None

calls instance-attribute #

calls = calls

specification instance-attribute #

specification = specification

ack_policy instance-attribute #

ack_policy = ack_policy

running instance-attribute #

running = False

tasks instance-attribute #

tasks: list[Task[Any]] = []

config instance-attribute #

config = config

last_id instance-attribute #

last_id = last_id

min_idle_time instance-attribute #

min_idle_time = min_idle_time

autoclaim_start_id instance-attribute #

autoclaim_start_id = b'0-0'

stream_sub property #

stream_sub: StreamSub

start async #

start() -> None
Source code in faststream/redis/subscriber/usecases/stream_subscriber.py
@override
async def start(self) -> None:
    client = self._client

    self.extra_watcher_options.update(
        redis=client,
        group=self.stream_sub.group,
    )

    stream = self.stream_sub

    read: Callable[
        [str],
        Awaitable[
            tuple[
                tuple[
                    TopicName,
                    tuple[
                        tuple[
                            Offset,
                            dict[bytes, bytes],
                        ],
                        ...,
                    ],
                ],
                ...,
            ],
        ],
    ]

    if stream.group and stream.consumer:
        group_create_id = "$" if self.last_id == ">" else self.last_id
        try:
            await client.xgroup_create(
                name=stream.name,
                id=group_create_id,
                groupname=stream.group,
                mkstream=True,
            )
        except ResponseError as e:
            if "already exists" not in str(e):
                raise

        def read(
            _: str,
        ) -> Awaitable[
            tuple[
                tuple[
                    TopicName,
                    tuple[
                        tuple[
                            Offset,
                            dict[bytes, bytes],
                        ],
                        ...,
                    ],
                ],
                ...,
            ],
        ]:
            return client.xreadgroup(
                groupname=stream.group,
                consumername=stream.consumer,
                streams={stream.name: stream.last_id},
                count=stream.max_records,
                block=stream.polling_interval,
                noack=stream.no_ack,
            )

    elif self.stream_sub.min_idle_time is None:

        def read(
            last_id: str,
        ) -> Awaitable[
            tuple[
                tuple[
                    TopicName,
                    tuple[
                        tuple[
                            Offset,
                            dict[bytes, bytes],
                        ],
                        ...,
                    ],
                ],
                ...,
            ],
        ]:
            return client.xread(
                {stream.name: last_id},
                block=stream.polling_interval,
                count=stream.max_records,
            )

    else:

        def read(
            _: str,
        ) -> Coroutine[
            Any,
            Any,
            tuple[
                tuple[
                    TopicName,
                    tuple[
                        tuple[
                            Offset,
                            dict[bytes, bytes],
                        ],
                        ...,
                    ],
                ],
                ...,
            ],
        ]:
            async def xautoclaim() -> tuple[
                tuple[TopicName, tuple[tuple[Offset, dict[bytes, bytes]], ...]], ...
            ]:
                stream_message = await client.xautoclaim(
                    name=self.stream_sub.name,
                    groupname=self.stream_sub.group,
                    consumername=self.stream_sub.consumer,
                    min_idle_time=self.min_idle_time,
                    start_id=self.autoclaim_start_id,
                    count=1,
                )
                stream_name = self.stream_sub.name.encode()
                (next_id, messages, _) = stream_message
                # Update start_id for next call
                self.autoclaim_start_id = next_id
                if not messages:
                    return ()
                return ((stream_name, messages),)

            return xautoclaim()

    await super().start(read)

stop async #

stop() -> None

Clean up handler subscription, cancel consume task in graceful mode.

Source code in faststream/_internal/endpoint/subscriber/mixins.py
async def stop(self) -> None:
    """Clean up handler subscription, cancel consume task in graceful mode."""
    await super().stop()

    for task in self.tasks:
        if not task.done():
            task.cancel()

    self.tasks.clear()

add_call #

add_call(
    *,
    parser_: Optional[CustomCallable],
    decoder_: Optional[CustomCallable],
    middlewares_: Sequence[SubscriberMiddleware[Any]],
    dependencies_: Iterable[Dependant],
) -> Self
Source code in faststream/_internal/endpoint/subscriber/usecase.py
def add_call(
    self,
    *,
    parser_: Optional["CustomCallable"],
    decoder_: Optional["CustomCallable"],
    middlewares_: Sequence["SubscriberMiddleware[Any]"],
    dependencies_: Iterable["Dependant"],
) -> Self:
    self._call_options = _CallOptions(
        parser=parser_,
        decoder=decoder_,
        middlewares=middlewares_,
        dependencies=dependencies_,
    )
    return self

consume async #

consume(msg: MsgType) -> Any

Consume a message asynchronously.

Source code in faststream/_internal/endpoint/subscriber/usecase.py
async def consume(self, msg: MsgType) -> Any:
    """Consume a message asynchronously."""
    if not self.running:
        return None

    try:
        return await self.process_message(msg)

    except StopConsume:
        # Stop handler at StopConsume exception
        await self.stop()

    except SystemExit:
        # Stop handler at `exit()` call
        await self.stop()

        if app := self._outer_config.fd_config.context.get("app"):
            app.exit()

    except Exception:  # nosec B110
        # All other exceptions were logged by CriticalLogMiddleware
        pass

process_message async #

process_message(msg: MsgType) -> Response

Execute all message processing stages.

Source code in faststream/_internal/endpoint/subscriber/usecase.py
async def process_message(self, msg: MsgType) -> "Response":
    """Execute all message processing stages."""
    context = self._outer_config.fd_config.context
    logger_state = self._outer_config.logger

    async with AsyncExitStack() as stack:
        stack.enter_context(self.lock)

        # Enter context before middlewares
        stack.enter_context(context.scope("handler_", self))
        stack.enter_context(context.scope("logger", logger_state.logger.logger))
        for k, v in self._outer_config.extra_context.items():
            stack.enter_context(context.scope(k, v))

        # enter all middlewares
        middlewares: list[BaseMiddleware] = []
        for base_m in self.__build__middlewares_stack():
            middleware = base_m(msg, context=context)
            middlewares.append(middleware)
            await middleware.__aenter__()

        cache: dict[Any, Any] = {}
        parsing_error: Exception | None = None
        for h in self.calls:
            try:
                message = await h.is_suitable(msg, cache)
            except Exception as e:
                parsing_error = e
                break

            if message is not None:
                stack.enter_context(
                    context.scope("log_context", self.get_log_context(message)),
                )
                stack.enter_context(context.scope("message", message))

                # Middlewares should be exited before scope release
                for m in middlewares:
                    stack.push_async_exit(m.__aexit__)

                result_msg = ensure_response(
                    await h.call(
                        message=message,
                        # consumer middlewares
                        _extra_middlewares=(
                            m.consume_scope for m in middlewares[::-1]
                        ),
                    ),
                )

                if not result_msg.correlation_id:
                    result_msg.correlation_id = message.correlation_id

                for p in chain(
                    self.__get_response_publisher(message),
                    h.handler._publishers,
                ):
                    await p._publish(
                        result_msg.as_publish_command(),
                        _extra_middlewares=(
                            m.publish_scope for m in middlewares[::-1]
                        ),
                    )

                # Return data for tests
                return result_msg

        # Suitable handler was not found or
        # parsing/decoding exception occurred
        for m in middlewares:
            stack.push_async_exit(m.__aexit__)

        # Reraise it to catch in tests
        if parsing_error:
            raise parsing_error

        error_msg = f"There is no suitable handler for {msg=}"
        raise SubscriberNotFound(error_msg)

    # An error was raised and processed by some middleware
    return ensure_response(None)

get_one async #

get_one(
    *, timeout: float = 5.0
) -> RedisStreamMessage | None
Source code in faststream/redis/subscriber/usecases/stream_subscriber.py
@override
async def get_one(
    self,
    *,
    timeout: float = 5.0,
) -> "RedisStreamMessage | None":
    assert not self.calls, (
        "You can't use `get_one` method if subscriber has registered handlers."
    )
    if self.min_idle_time is None:
        if self.stream_sub.group and self.stream_sub.consumer:
            stream_message = await self._client.xreadgroup(
                groupname=self.stream_sub.group,
                consumername=self.stream_sub.consumer,
                streams={self.stream_sub.name: self.last_id},
                block=math.ceil(timeout * 1000),
                count=1,
            )
        else:
            stream_message = await self._client.xread(
                {self.stream_sub.name: self.last_id},
                block=math.ceil(timeout * 1000),
                count=1,
            )
        if not stream_message:
            return None

        ((stream_name, ((message_id, raw_message),)),) = stream_message
    else:
        stream_message = await self._client.xautoclaim(
            name=self.stream_sub.name,
            groupname=self.stream_sub.group,
            consumername=self.stream_sub.consumer,
            min_idle_time=self.min_idle_time,
            start_id=self.autoclaim_start_id,
            count=1,
        )
        (next_id, messages, _) = stream_message
        # Update start_id for next call
        self.autoclaim_start_id = next_id
        if not messages:
            return None
        stream_name = self.stream_sub.name.encode()
        ((message_id, raw_message),) = messages

    self.last_id = message_id.decode()

    redis_incoming_msg = DefaultStreamMessage(
        type="stream",
        channel=stream_name.decode(),
        message_ids=[message_id],
        data=raw_message,
    )

    context = self._outer_config.fd_config.context
    async_parser, async_decoder = self._get_parser_and_decoder()

    msg: RedisStreamMessage = await process_msg(  # type: ignore[assignment]
        msg=redis_incoming_msg,
        middlewares=(
            m(redis_incoming_msg, context=context) for m in self._broker_middlewares
        ),
        parser=async_parser,
        decoder=async_decoder,
    )
    return msg

get_log_context #

get_log_context(
    message: Optional[StreamMessage[Any]],
) -> dict[str, str]
Source code in faststream/redis/subscriber/usecases/stream_subscriber.py
def get_log_context(
    self,
    message: Optional["BrokerStreamMessage[Any]"],
) -> dict[str, str]:
    return self.build_log_context(
        message=message,
        channel=self.stream_sub.name,
    )

schema #

schema() -> dict[str, SubscriberSpec]
Source code in faststream/_internal/endpoint/subscriber/usecase.py
def schema(self) -> dict[str, "SubscriberSpec"]:
    self._build_fastdepends_model()
    return self.specification.get_schema()

add_task #

add_task(
    func: Callable[..., Coroutine[Any, Any, Any]],
    func_args: tuple[Any, ...] | None = None,
    func_kwargs: dict[str, Any] | None = None,
) -> Task[Any]
Source code in faststream/_internal/endpoint/subscriber/mixins.py
def add_task(
    self,
    func: Callable[..., Coroutine[Any, Any, Any]],
    func_args: tuple[Any, ...] | None = None,
    func_kwargs: dict[str, Any] | None = None,
) -> asyncio.Task[Any]:
    args = func_args or ()
    kwargs = func_kwargs or {}
    task = asyncio.create_task(func(*args, **kwargs))
    callback = TaskCallbackSupervisor(func, func_args, func_kwargs, self)
    task.add_done_callback(callback)
    self.tasks.append(task)
    return task

build_log_context staticmethod #

build_log_context(
    message: Optional[StreamMessage[Any]], channel: str = ""
) -> dict[str, str]
Source code in faststream/redis/subscriber/usecases/basic.py
@staticmethod
def build_log_context(
    message: Optional["BrokerStreamMessage[Any]"],
    channel: str = "",
) -> dict[str, str]:
    return {
        "channel": channel,
        "message_id": getattr(message, "message_id", ""),
    }

consume_one async #

consume_one(msg: Any) -> None
Source code in faststream/redis/subscriber/usecases/basic.py
async def consume_one(self, msg: Any) -> None:
    await self.consume(msg)