Skip to content

BasePrometheusMiddleware

faststream.prometheus.middleware.BasePrometheusMiddleware #

BasePrometheusMiddleware(
    msg,
    /,
    *,
    settings_provider_factory,
    metrics_manager,
    context,
)

Bases: BaseMiddleware[PublishCommandType, AnyMsg], Generic[PublishCommandType, AnyMsg]

Source code in faststream/prometheus/middleware.py
def __init__(
    self,
    msg: AnyMsg | None,
    /,
    *,
    settings_provider_factory: Callable[
        [AnyMsg | None],
        MetricsSettingsProvider[AnyMsg, PublishCommandType] | None,
    ],
    metrics_manager: MetricsManager,
    context: "ContextRepo",
) -> None:
    self._metrics_manager = metrics_manager
    self._settings_provider = settings_provider_factory(msg)
    super().__init__(msg, context=context)

msg instance-attribute #

msg = msg

context instance-attribute #

context = context

consume_scope async #

consume_scope(call_next, msg)
Source code in faststream/prometheus/middleware.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[AnyMsg]",
) -> Any:
    if self._settings_provider is None or msg.source_type is SourceType.RESPONSE:
        return await call_next(msg)

    messaging_system = self._settings_provider.messaging_system
    consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
    destination_name = consume_attrs["destination_name"]

    self._metrics_manager.add_received_message(
        amount=consume_attrs["messages_count"],
        broker=messaging_system,
        handler=destination_name,
    )

    self._metrics_manager.observe_received_messages_size(
        size=consume_attrs["message_size"],
        broker=messaging_system,
        handler=destination_name,
    )

    self._metrics_manager.add_received_message_in_process(
        amount=consume_attrs["messages_count"],
        broker=messaging_system,
        handler=destination_name,
    )

    err: Exception | None = None
    start_time = time.perf_counter()

    try:
        result = await call_next(await self.on_consume(msg))

    except Exception as e:
        err = e

        if not isinstance(err, IgnoredException):
            self._metrics_manager.add_received_processed_message_exception(
                exception_type=type(err).__name__,
                broker=messaging_system,
                handler=destination_name,
            )
        raise

    finally:
        duration = time.perf_counter() - start_time
        self._metrics_manager.observe_received_processed_message_duration(
            duration=duration,
            broker=messaging_system,
            handler=destination_name,
        )

        self._metrics_manager.remove_received_message_in_process(
            amount=consume_attrs["messages_count"],
            broker=messaging_system,
            handler=destination_name,
        )

        status = ProcessingStatus.acked

        if msg.committed or err:
            status = (
                PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed)  # type: ignore[arg-type]
                or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
                or ProcessingStatus.error
            )

        self._metrics_manager.add_received_processed_message(
            amount=consume_attrs["messages_count"],
            status=status,
            broker=messaging_system,
            handler=destination_name,
        )

    return result

publish_scope async #

publish_scope(call_next, cmd)
Source code in faststream/prometheus/middleware.py
async def publish_scope(
    self,
    call_next: Callable[[PublishCommandType], Awaitable[Any]],
    cmd: PublishCommandType,
) -> Any:
    if self._settings_provider is None or cmd.publish_type is PublishType.REPLY:
        return await call_next(cmd)

    destination_name = self._settings_provider.get_publish_destination_name_from_cmd(
        cmd
    )
    messaging_system = self._settings_provider.messaging_system

    err: Exception | None = None
    start_time = time.perf_counter()

    try:
        result = await call_next(cmd)

    except Exception as e:
        err = e
        self._metrics_manager.add_published_message_exception(
            exception_type=type(err).__name__,
            broker=messaging_system,
            destination=destination_name,
        )
        raise

    finally:
        duration = time.perf_counter() - start_time

        self._metrics_manager.observe_published_message_duration(
            duration=duration,
            broker=messaging_system,
            destination=destination_name,
        )

        status = PublishingStatus.error if err else PublishingStatus.success

        self._metrics_manager.add_published_message(
            amount=len(cmd.batch_bodies),
            status=status,
            broker=messaging_system,
            destination=destination_name,
        )

    return result

on_receive async #

on_receive()

Hook to call on message receive.

Source code in faststream/_internal/middlewares.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""

after_processed async #

after_processed(exc_type=None, exc_val=None, exc_tb=None)

Asynchronously called after processing.

Source code in faststream/_internal/middlewares.py
async def after_processed(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> bool | None:
    """Asynchronously called after processing."""
    return False

on_consume async #

on_consume(msg)

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_consume(
    self,
    msg: "StreamMessage[AnyMsg]",
) -> "StreamMessage[AnyMsg]":
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    return msg

after_consume async #

after_consume(err)

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_consume(self, err: Exception | None) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    if err is not None:
        raise err

on_publish async #

on_publish(msg)

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_publish(
    self,
    msg: PublishCommandType,
) -> PublishCommandType:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    return msg

after_publish async #

after_publish(err)

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_publish(
    self,
    err: Exception | None,
) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    if err is not None:
        raise err