BaseRedisMetricsSettingsProvider
faststream.redis.prometheus.provider.BaseRedisMetricsSettingsProvider #
Bases: MetricsSettingsProvider[dict[str, Any], RedisPublishCommand]
Source code in faststream/redis/prometheus/provider.py
get_publish_destination_name_from_cmd #
get_publish_destination_name_from_cmd(
cmd: RedisPublishCommand,
) -> str
get_consume_attrs_from_message #
get_consume_attrs_from_message(
msg: StreamMessage[AnyMsg],
) -> ConsumeAttrs