Skip to content

PrometheusMiddleware

faststream.prometheus.PrometheusMiddleware #

PrometheusMiddleware(
    *,
    settings_provider_factory: Callable[
        [AnyMsg | None],
        MetricsSettingsProvider[AnyMsg, PublishCommandType]
        | None,
    ],
    registry: CollectorRegistry,
    app_name: str = EMPTY,
    metrics_prefix: str = "faststream",
    received_messages_size_buckets: Sequence[float]
    | None = None,
    custom_labels: dict[str, str | Callable[[Any], str]]
    | None = None,
)

Bases: Generic[PublishCommandType, AnyMsg]

Source code in faststream/prometheus/middleware.py
def __init__(
    self,
    *,
    settings_provider_factory: Callable[
        [AnyMsg | None],
        MetricsSettingsProvider[AnyMsg, PublishCommandType] | None,
    ],
    registry: "CollectorRegistry",
    app_name: str = EMPTY,
    metrics_prefix: str = "faststream",
    received_messages_size_buckets: Sequence[float] | None = None,
    custom_labels: dict[str, str | Callable[[Any], str]] | None = None,
) -> None:
    if app_name is EMPTY:
        app_name = metrics_prefix

    self._settings_provider_factory = settings_provider_factory
    self._metrics_container = MetricsContainer(
        registry,
        metrics_prefix=metrics_prefix,
        received_messages_size_buckets=received_messages_size_buckets,
        custom_label_names=tuple((custom_labels or {}).keys()),
    )
    self._metrics_manager = MetricsManager(
        self._metrics_container,
        app_name=app_name,
    )
    self._static_labels = {}
    self._dynamic_labels = {}

    for k, v in (custom_labels or {}).items():
        if callable(v):
            self._dynamic_labels[k] = v
        else:
            self._static_labels[k] = v