Skip to content

telemetry_attributes_provider_factory

faststream.nats.opentelemetry.provider.telemetry_attributes_provider_factory #

telemetry_attributes_provider_factory(
    msg: Optional[Msg],
) -> NatsTelemetrySettingsProvider
telemetry_attributes_provider_factory(
    msg: Sequence[Msg],
) -> NatsBatchTelemetrySettingsProvider
telemetry_attributes_provider_factory(
    msg: Union[Msg, Sequence[Msg], None],
) -> (
    NatsTelemetrySettingsProvider
    | NatsBatchTelemetrySettingsProvider
)
telemetry_attributes_provider_factory(msg)
Source code in faststream/nats/opentelemetry/provider.py
def telemetry_attributes_provider_factory(
    msg: Union["Msg", Sequence["Msg"], None],
) -> NatsTelemetrySettingsProvider | NatsBatchTelemetrySettingsProvider | None:
    if isinstance(msg, Sequence):
        return NatsBatchTelemetrySettingsProvider()
    if isinstance(msg, Msg) or msg is None:
        return NatsTelemetrySettingsProvider()
    # KeyValue and Object Storage watch cases
    return None