Skip to content

settings_provider_factory

faststream.confluent.prometheus.provider.settings_provider_factory #

settings_provider_factory(
    msg: Union[Message, Sequence[Message], None],
) -> (
    ConfluentMetricsSettingsProvider
    | BatchConfluentMetricsSettingsProvider
)
Source code in faststream/confluent/prometheus/provider.py
def settings_provider_factory(
    msg: Union["Message", Sequence["Message"], None],
) -> ConfluentMetricsSettingsProvider | BatchConfluentMetricsSettingsProvider:
    if isinstance(msg, Sequence):
        return BatchConfluentMetricsSettingsProvider()
    return ConfluentMetricsSettingsProvider()