PrometheusMiddleware(
*,
settings_provider_factory: Callable[
[AnyMsg | None],
MetricsSettingsProvider[AnyMsg, PublishCommandType]
| None,
],
registry: CollectorRegistry,
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Sequence[float]
| None = None,
custom_labels: dict[str, str | Callable[[Any], str]]
| None = None,
)
Bases: Generic[PublishCommandType, AnyMsg]
Source code in faststream/prometheus/middleware.py
| def __init__(
self,
*,
settings_provider_factory: Callable[
[AnyMsg | None],
MetricsSettingsProvider[AnyMsg, PublishCommandType] | None,
],
registry: "CollectorRegistry",
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Sequence[float] | None = None,
custom_labels: dict[str, str | Callable[[Any], str]] | None = None,
) -> None:
if app_name is EMPTY:
app_name = metrics_prefix
self._settings_provider_factory = settings_provider_factory
self._metrics_container = MetricsContainer(
registry,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
custom_label_names=tuple((custom_labels or {}).keys()),
)
self._metrics_manager = MetricsManager(
self._metrics_container,
app_name=app_name,
)
self._static_labels = {}
self._dynamic_labels = {}
for k, v in (custom_labels or {}).items():
if callable(v):
self._dynamic_labels[k] = v
else:
self._static_labels[k] = v
|