Skip to content

RabbitSubscriber

faststream.rabbit.subscriber.RabbitSubscriber #

RabbitSubscriber(config, specification, calls)

Bases: SubscriberUsecase['IncomingMessage']

A class to handle logic for RabbitMQ message consumption.

Source code in faststream/rabbit/subscriber/usecase.py
def __init__(
    self,
    config: "RabbitSubscriberConfig",
    specification: "SubscriberSpecification[Any, Any]",
    calls: "CallsCollection[IncomingMessage]",
) -> None:
    parser = AioPikaParser(pattern=config.queue.path_regex)
    config.decoder = parser.decode_message
    config.parser = parser.parse_message
    super().__init__(
        config,
        specification=specification,
        calls=calls,
    )

    self.queue = config.queue
    self.exchange = config.exchange

    self.consume_args = config.consume_args or {}

    self.__no_ack = config.ack_first

    self._consumer_tag: str | None = None
    self._queue_obj: RobustQueue | None = None
    self.channel = config.channel

lock instance-attribute #

lock = FakeContext()

extra_watcher_options instance-attribute #

extra_watcher_options = {}

graceful_timeout instance-attribute #

graceful_timeout

calls instance-attribute #

calls = calls

specification instance-attribute #

specification = specification

ack_policy instance-attribute #

ack_policy = ack_policy

running instance-attribute #

running = False

queue instance-attribute #

queue = queue

exchange instance-attribute #

exchange = exchange

consume_args instance-attribute #

consume_args = consume_args or {}

channel instance-attribute #

channel = channel

app_id property #

app_id

add_call #

add_call(*, parser_, decoder_, middlewares_, dependencies_)
Source code in faststream/_internal/endpoint/subscriber/usecase.py
def add_call(
    self,
    *,
    parser_: Optional["CustomCallable"],
    decoder_: Optional["CustomCallable"],
    middlewares_: Sequence["SubscriberMiddleware[Any]"],
    dependencies_: Iterable["Dependant"],
) -> Self:
    self._call_options = _CallOptions(
        parser=parser_,
        decoder=decoder_,
        middlewares=middlewares_,
        dependencies=dependencies_,
    )
    return self

consume async #

consume(msg)

Consume a message asynchronously.

Source code in faststream/_internal/endpoint/subscriber/usecase.py
async def consume(self, msg: MsgType) -> Any:
    """Consume a message asynchronously."""
    if not self.running:
        return None

    try:
        return await self.process_message(msg)

    except StopConsume:
        # Stop handler at StopConsume exception
        await self.stop()

    except SystemExit:
        # Stop handler at `exit()` call
        await self.stop()

        if app := self._outer_config.fd_config.context.get("app"):
            app.exit()

    except Exception:  # nosec B110
        # All other exceptions were logged by CriticalLogMiddleware
        pass

process_message async #

process_message(msg)

Execute all message processing stages.

Source code in faststream/_internal/endpoint/subscriber/usecase.py
async def process_message(self, msg: MsgType) -> "Response":
    """Execute all message processing stages."""
    context = self._outer_config.fd_config.context
    logger_state = self._outer_config.logger

    async with AsyncExitStack() as stack:
        stack.enter_context(self.lock)

        # Enter context before middlewares
        stack.enter_context(context.scope("handler_", self))
        stack.enter_context(context.scope("logger", logger_state.logger.logger))
        for k, v in self._outer_config.extra_context.items():
            stack.enter_context(context.scope(k, v))

        # enter all middlewares
        middlewares: list[BaseMiddleware] = []
        for base_m in self.__build__middlewares_stack():
            middleware = base_m(msg, context=context)
            middlewares.append(middleware)
            await middleware.__aenter__()

        cache: dict[Any, Any] = {}
        parsing_error: Exception | None = None
        for h in self.calls:
            try:
                message = await h.is_suitable(msg, cache)
            except Exception as e:
                parsing_error = e
                break

            if message is not None:
                stack.enter_context(
                    context.scope("log_context", self.get_log_context(message)),
                )
                stack.enter_context(context.scope("message", message))

                # Middlewares should be exited before scope release
                for m in middlewares:
                    stack.push_async_exit(m.__aexit__)

                result_msg = ensure_response(
                    await h.call(
                        message=message,
                        # consumer middlewares
                        _extra_middlewares=(
                            m.consume_scope for m in middlewares[::-1]
                        ),
                    ),
                )

                if not result_msg.correlation_id:
                    result_msg.correlation_id = message.correlation_id

                for p in chain(
                    self.__get_response_publisher(message),
                    h.handler._publishers,
                ):
                    await p._publish(
                        result_msg.as_publish_command(),
                        _extra_middlewares=(
                            m.publish_scope for m in middlewares[::-1]
                        ),
                    )

                # Return data for tests
                return result_msg

        # Suitable handler was not found or
        # parsing/decoding exception occurred
        for m in middlewares:
            stack.push_async_exit(m.__aexit__)

        # Reraise it to catch in tests
        if parsing_error:
            raise parsing_error

        error_msg = f"There is no suitable handler for {msg=}"
        raise SubscriberNotFound(error_msg)

    # An error was raised and processed by some middleware
    return ensure_response(None)

schema #

schema()
Source code in faststream/_internal/endpoint/subscriber/usecase.py
def schema(self) -> dict[str, "SubscriberSpec"]:
    self._build_fastdepends_model()
    return self.specification.get_schema()

routing #

routing()
Source code in faststream/rabbit/subscriber/usecase.py
def routing(self) -> str:
    return f"{self._outer_config.prefix}{self.queue.routing()}"

start async #

start()

Starts the consumer for the RabbitMQ queue.

Source code in faststream/rabbit/subscriber/usecase.py
@override
async def start(self) -> None:
    """Starts the consumer for the RabbitMQ queue."""
    await super().start()

    queue_to_bind = self.queue.add_prefix(self._outer_config.prefix)

    declarer = self._outer_config.declarer

    self._queue_obj = queue = await declarer.declare_queue(
        queue_to_bind,
        channel=self.channel,
    )

    if (
        self.exchange is not None
        and queue_to_bind.declare  # queue just getted from RMQ
        and self.exchange.name  # check Exchange is not default
    ):
        exchange = await declarer.declare_exchange(
            self.exchange,
            channel=self.channel,
        )

        await queue.bind(
            exchange,
            routing_key=queue_to_bind.routing(),
            arguments=queue_to_bind.bind_arguments,
            timeout=queue_to_bind.timeout,
            robust=self.queue.robust,
        )

    if self.calls:
        self._consumer_tag = await self._queue_obj.consume(
            # NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
            self.consume,  # type: ignore[arg-type]
            no_ack=self.__no_ack,
            arguments=self.consume_args,
        )

    self._post_start()

stop async #

stop()
Source code in faststream/rabbit/subscriber/usecase.py
async def stop(self) -> None:
    await super().stop()

    if self._queue_obj is not None:
        if self._consumer_tag is not None:  # pragma: no branch
            if not self._queue_obj.channel.is_closed:
                await self._queue_obj.cancel(self._consumer_tag)
            self._consumer_tag = None

        self._queue_obj = None

get_one async #

get_one(*, timeout=5.0, no_ack=True)
Source code in faststream/rabbit/subscriber/usecase.py
@override
async def get_one(
    self,
    *,
    timeout: float = 5.0,
    no_ack: bool = True,
) -> "RabbitMessage | None":
    assert self._queue_obj, "You should start subscriber at first."
    assert not self.calls, (
        "You can't use `get_one` method if subscriber has registered handlers."
    )

    sleep_interval = timeout / 10

    raw_message: IncomingMessage | None = None
    with (
        contextlib.suppress(asyncio.exceptions.CancelledError),
        anyio.move_on_after(timeout),
    ):
        while (  # noqa: ASYNC110
            raw_message := await self._queue_obj.get(
                fail=False,
                no_ack=no_ack,
                timeout=timeout,
            )
        ) is None:
            await anyio.sleep(sleep_interval)

    context = self._outer_config.fd_config.context
    async_parser, async_decoder = self._get_parser_and_decoder()

    msg: RabbitMessage | None = await process_msg(  # type: ignore[assignment]
        msg=raw_message,
        middlewares=(
            m(raw_message, context=context) for m in self._broker_middlewares
        ),
        parser=async_parser,
        decoder=async_decoder,
    )
    return msg

build_log_context staticmethod #

build_log_context(message, queue, exchange=None)
Source code in faststream/rabbit/subscriber/usecase.py
@staticmethod
def build_log_context(
    message: Optional["StreamMessage[Any]"],
    queue: "RabbitQueue",
    exchange: Optional["RabbitExchange"] = None,
) -> dict[str, str]:
    return {
        "queue": queue.name,
        "exchange": getattr(exchange, "name", ""),
        "message_id": getattr(message, "message_id", ""),
    }

get_log_context #

get_log_context(message)
Source code in faststream/rabbit/subscriber/usecase.py
def get_log_context(
    self,
    message: Optional["StreamMessage[Any]"],
) -> dict[str, str]:
    return self.build_log_context(
        message=message,
        queue=self.queue,
        exchange=self.exchange,
    )