TelemetryMiddleware(
*,
settings_provider_factory,
tracer_provider=None,
meter_provider=None,
meter=None,
include_messages_counters=False,
)
Bases: Generic[PublishCommandType]
Source code in faststream/opentelemetry/middleware.py
| def __init__(
self,
*,
settings_provider_factory: Callable[
[Any],
Optional["TelemetrySettingsProvider[Any, PublishCommandType]"],
],
tracer_provider: Optional["TracerProvider"] = None,
meter_provider: Optional["MeterProvider"] = None,
meter: Optional["Meter"] = None,
include_messages_counters: bool = False,
) -> None:
self._tracer = _get_tracer(tracer_provider)
self._meter = _get_meter(meter_provider, meter)
self._metrics = _MetricsContainer(self._meter, include_messages_counters)
self._settings_provider_factory = settings_provider_factory
|