MQTTBaseSubscriber(
config: MQTTSubscriberConfig,
specification: SubscriberSpecification[Any, Any],
calls: CallsCollection[Message],
)
Bases: TasksMixin, SubscriberUsecase[Message]
Base class for all MQTT subscribers.
Source code in faststream/mqtt/subscriber/usecase.py
| def __init__(
self,
config: "MQTTSubscriberConfig",
specification: "SubscriberSpecification[Any, Any]",
calls: "CallsCollection[zmqtt.Message]",
) -> None:
# version may not be available yet when subscriber is created on a router
# before include_router is called; default to V5 and re-resolve in start().
parser: MQTTBaseParser
if getattr(config._outer_config, "version", "5.0") == "3.1.1":
parser = MQTTParserV311()
else:
parser = MQTTParserV5()
config.parser = parser.parse_message
config.decoder = parser.decode_message
super().__init__(config, specification, calls)
self._topic = config.topic
self._shared = config.shared
self._qos = config.qos
self._subscription: zmqtt.Subscription | None = None
if config.ack_policy is AckPolicy.NACK_ON_ERROR:
warnings.warn(
"MQTT has no nack primitive; with NACK_ON_ERROR, "
"on error QoS 1/2 messages will not be acknowledged "
"and the broker will redeliver them.",
RuntimeWarning,
stacklevel=3,
)
|
lock instance-attribute
lock: AbstractContextManager[Any] = FakeContext()
extra_watcher_options: dict[str, Any] = {}
graceful_timeout instance-attribute
graceful_timeout: float | None
specification instance-attribute
specification = specification
ack_policy instance-attribute
running instance-attribute
tasks instance-attribute
tasks: list[Task[Any]] = []
add_call
add_call(
*,
parser_: Optional[CustomCallable],
decoder_: Optional[CustomCallable],
dependencies_: Iterable[Dependant],
) -> Self
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| def add_call(
self,
*,
parser_: Optional["CustomCallable"],
decoder_: Optional["CustomCallable"],
dependencies_: Iterable["Dependant"],
) -> Self:
self._call_options = _CallOptions(
parser=parser_,
decoder=decoder_,
dependencies=dependencies_,
)
return self
|
consume async
consume(msg: MsgType) -> Any
Consume a message asynchronously.
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| async def consume(self, msg: MsgType) -> Any:
"""Consume a message asynchronously."""
if not self.running:
return None
try:
return await self.process_message(msg)
except StopConsume:
# Stop handler at StopConsume exception
await self.stop()
except SystemExit:
# Stop handler at `exit()` call
await self.stop()
if app := self._outer_config.fd_config.context.get("app"):
app.exit()
except Exception: # nosec B110
# All other exceptions were logged by CriticalLogMiddleware
pass
|
process_message async
process_message(msg: MsgType) -> Response
Execute all message processing stages.
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| async def process_message(self, msg: MsgType) -> "Response":
"""Execute all message processing stages."""
context = self._outer_config.fd_config.context
logger_state = self._outer_config.logger
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
# Enter context before middlewares
stack.enter_context(context.scope("handler_", self))
stack.enter_context(context.scope("logger", logger_state.logger.logger))
for k, v in self._outer_config.extra_context.items():
stack.enter_context(context.scope(k, v))
# enter all middlewares
middlewares: list[BaseMiddleware] = []
for base_m in self.__build__middlewares_stack():
middleware = base_m(msg, context=context)
middlewares.append(middleware)
await middleware.__aenter__()
cache: dict[Any, Any] = {}
parsing_error: Exception | None = None
for h in self.calls:
try:
message = await h.is_suitable(msg, cache)
except Exception as e:
parsing_error = e
break
if message is not None:
stack.enter_context(
context.scope("log_context", self.get_log_context(message)),
)
stack.enter_context(context.scope("message", message))
# Middlewares should be exited before scope release
for m in middlewares:
stack.push_async_exit(m.__aexit__)
result_msg = ensure_response(
await h.call(
message=message,
# consumer middlewares
_extra_middlewares=(
m.consume_scope for m in middlewares[::-1]
),
),
)
if not result_msg.correlation_id:
result_msg.correlation_id = message.correlation_id
for p in chain(
self.__get_response_publisher(message),
h.handler._publishers,
):
await p._publish(
result_msg.as_publish_command(),
_extra_middlewares=(
m.publish_scope for m in middlewares[::-1]
),
)
# Return data for tests
return result_msg
# Suitable handler was not found or
# parsing/decoding exception occurred
for m in middlewares:
stack.push_async_exit(m.__aexit__)
# Reraise it to catch in tests
if parsing_error:
raise parsing_error
error_msg = f"There is no suitable handler for {msg=}"
raise SubscriberNotFound(error_msg)
# An error was raised and processed by some middleware
return ensure_response(None)
|
schema
Source code in faststream/_internal/endpoint/subscriber/usecase.py
| def schema(self) -> dict[str, "SubscriberSpec"]:
self._build_fastdepends_model()
return self.specification.get_schema()
|
add_task
add_task(
func: Callable[..., Coroutine[Any, Any, Any]],
func_args: tuple[Any, ...] | None = None,
func_kwargs: dict[str, Any] | None = None,
) -> Task[Any]
Source code in faststream/_internal/endpoint/subscriber/mixins.py
| def add_task(
self,
func: Callable[..., Coroutine[Any, Any, Any]],
func_args: tuple[Any, ...] | None = None,
func_kwargs: dict[str, Any] | None = None,
) -> asyncio.Task[Any]:
args = func_args or ()
kwargs = func_kwargs or {}
task = asyncio.create_task(func(*args, **kwargs))
callback = TaskCallbackSupervisor(func, func_args, func_kwargs, self)
task.add_done_callback(callback)
self.tasks.append(task)
return task
|
build_log_context staticmethod
build_log_context(
message: StreamMessage[Message] | None, topic: str = ""
) -> dict[str, str]
Source code in faststream/mqtt/subscriber/usecase.py
| @staticmethod
def build_log_context(
message: "StreamMessage[zmqtt.Message] | None",
topic: str = "",
) -> dict[str, str]:
return {
"topic": topic,
"message_id": getattr(message, "message_id", ""),
}
|
get_log_context
get_log_context(
message: StreamMessage[Message] | None,
) -> dict[str, str]
Source code in faststream/mqtt/subscriber/usecase.py
| def get_log_context(
self,
message: "StreamMessage[zmqtt.Message] | None",
) -> dict[str, str]:
return self.build_log_context(message=message, topic=self.topic)
|
start async
Source code in faststream/mqtt/subscriber/usecase.py
| @override
async def start(self) -> None:
# Re-resolve the parser now that _outer_config is fully composed
# (i.e. include_router has been called and the broker's MQTTBrokerConfig
# is reachable through the config chain).
parser: MQTTBaseParser
if getattr(self._outer_config, "version", "5.0") == "3.1.1":
parser = MQTTParserV311()
else:
parser = MQTTParserV5()
self._parser = parser.parse_message
self._decoder = parser.decode_message
await super().start()
if self.calls:
await self._create_subscription()
self.add_task(self._consume_loop)
self._post_start()
|
stop async
Source code in faststream/mqtt/subscriber/usecase.py
| @override
async def stop(self) -> None:
await super().stop()
if self._subscription is not None:
with suppress(Exception):
await self._subscription.stop()
self._subscription = None
|
get_one async
get_one(
*, timeout: float = 5.0
) -> StreamMessage[Message] | None
Source code in faststream/mqtt/subscriber/usecase.py
| @override
async def get_one(
self,
*,
timeout: float = 5.0,
) -> "StreamMessage[zmqtt.Message] | None":
assert not self.calls, (
"You can't use `get_one` method if subscriber has registered handlers."
)
if self._subscription is None:
auto_ack = self.ack_policy is AckPolicy.ACK_FIRST
self._subscription = self._outer_config.client.subscribe(
self.topic,
qos=zmqtt.QoS(self._qos),
auto_ack=auto_ack,
)
await self._subscription.start()
async_parser, async_decoder = self._get_parser_and_decoder()
raw_msg: zmqtt.Message | None = None
with anyio.move_on_after(timeout):
raw_msg = await self._subscription.get_message()
context = self._outer_config.fd_config.context
return await process_msg(
msg=raw_msg,
middlewares=(m(raw_msg, context=context) for m in self._broker_middlewares),
parser=async_parser,
decoder=async_decoder,
)
|