Skip to content

MQTTMetricsSettingsProvider

faststream.mqtt.prometheus.provider.MQTTMetricsSettingsProvider #

MQTTMetricsSettingsProvider()

Bases: MetricsSettingsProvider[Message, MQTTPublishCommand]

Source code in faststream/mqtt/prometheus/provider.py
def __init__(self) -> None:
    self.messaging_system = "mqtt"

messaging_system instance-attribute #

messaging_system = 'mqtt'

get_consume_attrs_from_message #

get_consume_attrs_from_message(
    msg: StreamMessage[Message],
) -> ConsumeAttrs
Source code in faststream/mqtt/prometheus/provider.py
def get_consume_attrs_from_message(
    self,
    msg: "StreamMessage[zmqtt.Message]",
) -> ConsumeAttrs:
    return {
        "destination_name": msg.raw_message.topic,
        "message_size": len(msg.body),
        "messages_count": 1,
    }

get_publish_destination_name_from_cmd #

get_publish_destination_name_from_cmd(
    cmd: MQTTPublishCommand,
) -> str
Source code in faststream/mqtt/prometheus/provider.py
def get_publish_destination_name_from_cmd(
    self,
    cmd: MQTTPublishCommand,
) -> str:
    return cmd.destination