Bases: SubscriberUsecase['IncomingMessage']
A class to handle logic for RabbitMQ message consumption.
Source code in faststream/rabbit/subscriber/usecase.py
| def __init__(
self,
config: "RabbitSubscriberConfig",
specification: "SubscriberSpecification[Any, Any]",
calls: "CallsCollection[IncomingMessage]",
) -> None:
parser = AioPikaParser(pattern=config.queue.path_regex)
config.decoder = parser.decode_message
config.parser = parser.parse_message
super().__init__(
config,
specification=specification,
calls=calls,
)
self.queue = config.queue
self.exchange = config.exchange
self.consume_args = config.consume_args or {}
self.__no_ack = config.ack_first
self._consumer_tag: str | None = None
self._queue_obj: RobustQueue | None = None
self.channel = config.channel
|
exchange
instance-attribute
consume_args
instance-attribute
consume_args = consume_args or {}
channel
instance-attribute
extra_watcher_options = {}
graceful_timeout
instance-attribute
specification
instance-attribute
specification = specification
ack_policy
instance-attribute
running
instance-attribute
routing
Source code in faststream/rabbit/subscriber/usecase.py
| def routing(self) -> str:
return f"{self._outer_config.prefix}{self.queue.routing()}"
|
start
async
Starts the consumer for the RabbitMQ queue.
Source code in faststream/rabbit/subscriber/usecase.py
| @override
async def start(self) -> None:
"""Starts the consumer for the RabbitMQ queue."""
await super().start()
queue_to_bind = self.queue.add_prefix(self._outer_config.prefix)
declarer = self._outer_config.declarer
self._queue_obj = queue = await declarer.declare_queue(
queue_to_bind,
channel=self.channel,
)
if (
self.exchange is not None
and queue_to_bind.declare # queue just getted from RMQ
and self.exchange.name # check Exchange is not default
):
exchange = await declarer.declare_exchange(
self.exchange,
channel=self.channel,
)
await queue.bind(
exchange,
routing_key=queue_to_bind.routing(),
arguments=queue_to_bind.bind_arguments,
timeout=queue_to_bind.timeout,
robust=self.queue.robust,
)
if self.calls:
self._consumer_tag = await self._queue_obj.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
self.consume, # type: ignore[arg-type]
no_ack=self.__no_ack,
arguments=self.consume_args,
)
self._post_start()
|
stop
async
Source code in faststream/rabbit/subscriber/usecase.py
| async def stop(self) -> None:
await super().stop()
if self._queue_obj is not None:
if self._consumer_tag is not None: # pragma: no branch
if not self._queue_obj.channel.is_closed:
await self._queue_obj.cancel(self._consumer_tag)
self._consumer_tag = None
self._queue_obj = None
|
get_one
async
get_one(*, timeout=5.0, no_ack=True)
Source code in faststream/rabbit/subscriber/usecase.py
| @override
async def get_one(
self,
*,
timeout: float = 5.0,
no_ack: bool = True,
) -> "RabbitMessage | None":
assert self._queue_obj, "You should start subscriber at first."
assert not self.calls, (
"You can't use `get_one` method if subscriber has registered handlers."
)
sleep_interval = timeout / 10
raw_message: IncomingMessage | None = None
with (
contextlib.suppress(asyncio.exceptions.CancelledError),
anyio.move_on_after(timeout),
):
while ( # noqa: ASYNC110
raw_message := await self._queue_obj.get(
fail=False,
no_ack=no_ack,
timeout=timeout,
)
) is None:
await anyio.sleep(sleep_interval)
context = self._outer_config.fd_config.context
async_parser, async_decoder = self._get_parser_and_decoder()
msg: RabbitMessage | None = await process_msg( # type: ignore[assignment]
msg=raw_message,
middlewares=(
m(raw_message, context=context) for m in self._broker_middlewares
),
parser=async_parser,
decoder=async_decoder,
)
return msg
|
build_log_context
staticmethod
build_log_context(message, queue, exchange=None)
Source code in faststream/rabbit/subscriber/usecase.py
| @staticmethod
def build_log_context(
message: Optional["StreamMessage[Any]"],
queue: "RabbitQueue",
exchange: Optional["RabbitExchange"] = None,
) -> dict[str, str]:
return {
"queue": queue.name,
"exchange": getattr(exchange, "name", ""),
"message_id": getattr(message, "message_id", ""),
}
|
get_log_context
Source code in faststream/rabbit/subscriber/usecase.py
| def get_log_context(
self,
message: Optional["StreamMessage[Any]"],
) -> dict[str, str]:
return self.build_log_context(
message=message,
queue=self.queue,
exchange=self.exchange,
)
|
add_call
add_call(*, parser_, decoder_, middlewares_, dependencies_)
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| def add_call(
self,
*,
parser_: Optional["CustomCallable"],
decoder_: Optional["CustomCallable"],
middlewares_: Sequence["SubscriberMiddleware[Any]"],
dependencies_: Iterable["Dependant"],
) -> Self:
self._call_options = _CallOptions(
parser=parser_,
decoder=decoder_,
middlewares=middlewares_,
dependencies=dependencies_,
)
return self
|
consume
async
Consume a message asynchronously.
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| async def consume(self, msg: MsgType) -> Any:
"""Consume a message asynchronously."""
if not self.running:
return None
try:
return await self.process_message(msg)
except StopConsume:
# Stop handler at StopConsume exception
await self.stop()
except SystemExit:
# Stop handler at `exit()` call
await self.stop()
if app := self._outer_config.fd_config.context.get("app"):
app.exit()
except Exception: # nosec B110
# All other exceptions were logged by CriticalLogMiddleware
pass
|
process_message
async
Execute all message processing stages.
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| async def process_message(self, msg: MsgType) -> "Response":
"""Execute all message processing stages."""
context = self._outer_config.fd_config.context
logger_state = self._outer_config.logger
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
# Enter context before middlewares
stack.enter_context(context.scope("handler_", self))
stack.enter_context(context.scope("logger", logger_state.logger.logger))
for k, v in self._outer_config.extra_context.items():
stack.enter_context(context.scope(k, v))
# enter all middlewares
middlewares: list[BaseMiddleware] = []
for base_m in self.__build__middlewares_stack():
middleware = base_m(msg, context=context)
middlewares.append(middleware)
await middleware.__aenter__()
cache: dict[Any, Any] = {}
parsing_error: Exception | None = None
for h in self.calls:
try:
message = await h.is_suitable(msg, cache)
except Exception as e:
parsing_error = e
break
if message is not None:
stack.enter_context(
context.scope("log_context", self.get_log_context(message)),
)
stack.enter_context(context.scope("message", message))
# Middlewares should be exited before scope release
for m in middlewares:
stack.push_async_exit(m.__aexit__)
result_msg = ensure_response(
await h.call(
message=message,
# consumer middlewares
_extra_middlewares=(
m.consume_scope for m in middlewares[::-1]
),
),
)
if not result_msg.correlation_id:
result_msg.correlation_id = message.correlation_id
for p in chain(
self.__get_response_publisher(message),
h.handler._publishers,
):
await p._publish(
result_msg.as_publish_command(),
_extra_middlewares=(
m.publish_scope for m in middlewares[::-1]
),
)
# Return data for tests
return result_msg
# Suitable handler was not found or
# parsing/decoding exception occurred
for m in middlewares:
stack.push_async_exit(m.__aexit__)
# Reraise it to catch in tests
if parsing_error:
raise parsing_error
error_msg = f"There is no suitable handler for {msg=}"
raise SubscriberNotFound(error_msg)
# An error was raised and processed by some middleware
return ensure_response(None)
|
schema
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| def schema(self) -> dict[str, "SubscriberSpec"]:
self._build_fastdepends_model()
return self.specification.get_schema()
|