NatsBatchTelemetrySettingsProvider
faststream.nats.opentelemetry.provider.NatsBatchTelemetrySettingsProvider #
Bases: BaseNatsTelemetrySettingsProvider[list['Msg']]
Source code in faststream/nats/opentelemetry/provider.py
get_consume_attrs_from_message #
get_consume_attrs_from_message(
msg: StreamMessage[list[Msg]],
) -> dict[str, Any]
Source code in faststream/nats/opentelemetry/provider.py
get_consume_destination_name #
get_consume_destination_name(
msg: StreamMessage[list[Msg]],
) -> str
get_publish_attrs_from_cmd #
get_publish_attrs_from_cmd(
cmd: PublishCommand,
) -> dict[str, Any]