Skip to content

settings_provider_factory

faststream.nats.prometheus.provider.settings_provider_factory #

settings_provider_factory(msg)
Source code in faststream/nats/prometheus/provider.py
def settings_provider_factory(
    msg: Union["Msg", Sequence["Msg"], None],
) -> NatsMetricsSettingsProvider | BatchNatsMetricsSettingsProvider | None:
    if isinstance(msg, Sequence):
        return BatchNatsMetricsSettingsProvider()
    if isinstance(msg, Msg) or msg is None:
        return NatsMetricsSettingsProvider()
    # KeyValue and Object Storage watch cases
    return None