Skip to content

settings_provider_factory

faststream.redis.prometheus.provider.settings_provider_factory #

settings_provider_factory(msg)
Source code in faststream/redis/prometheus/provider.py
def settings_provider_factory(
    msg: dict[str, Any] | None,
) -> RedisMetricsSettingsProvider | BatchRedisMetricsSettingsProvider:
    if msg is not None and msg.get("type", "").startswith("b"):
        return BatchRedisMetricsSettingsProvider()
    return RedisMetricsSettingsProvider()