MQTTTelemetrySettingsProvider
faststream.mqtt.opentelemetry.provider.MQTTTelemetrySettingsProvider #
Bases: TelemetrySettingsProvider[Message, MQTTPublishCommand]
Source code in faststream/mqtt/opentelemetry/provider.py
get_consume_attrs_from_message #
get_consume_attrs_from_message(
msg: StreamMessage[Message],
) -> dict[str, Any]
Source code in faststream/mqtt/opentelemetry/provider.py
get_consume_destination_name #
get_consume_destination_name(
msg: StreamMessage[Message],
) -> str
get_publish_attrs_from_cmd #
get_publish_attrs_from_cmd(
cmd: MQTTPublishCommand,
) -> dict[str, Any]