Skip to content

BaseTelemetryMiddleware

faststream.opentelemetry.middleware.BaseTelemetryMiddleware #

BaseTelemetryMiddleware(
    msg,
    /,
    *,
    tracer,
    settings_provider_factory,
    metrics_container,
    context,
)

Bases: BaseMiddleware[PublishCommandType]

Source code in faststream/opentelemetry/middleware.py
def __init__(
    self,
    msg: Any | None,
    /,
    *,
    tracer: "Tracer",
    settings_provider_factory: Callable[
        [Any],
        Optional["TelemetrySettingsProvider[Any, PublishCommandType]"],
    ],
    metrics_container: _MetricsContainer,
    context: "ContextRepo",
) -> None:
    super().__init__(msg, context=context)

    self._tracer = tracer
    self._metrics = metrics_container
    self._current_span: Span | None = None
    self._origin_context: Context | None = None
    self._scope_tokens: list[tuple[str, Token[Any]]] = []
    self.__settings_provider = settings_provider_factory(msg)

msg instance-attribute #

msg = msg

context instance-attribute #

context = context

publish_scope async #

publish_scope(call_next, msg)
Source code in faststream/opentelemetry/middleware.py
async def publish_scope(
    self,
    call_next: "AsyncFunc",
    msg: "PublishCommandType",
) -> Any:
    if (provider := self.__settings_provider) is None:
        return await call_next(msg)

    headers = msg.headers
    current_context = context.get_current()
    destination_name = provider.get_publish_destination_name(msg)

    current_baggage: Baggage | None = self.context.get_local("baggage")
    if current_baggage:
        headers.update(current_baggage.to_headers())

    trace_attributes = provider.get_publish_attrs_from_cmd(msg)
    metrics_attributes = {
        SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
        SpanAttributes.MESSAGING_DESTINATION_NAME: destination_name,
    }

    # NOTE: if batch with single message?
    if (msg_count := len(msg.batch_bodies)) > 1:
        trace_attributes[SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT] = msg_count
        current_context = _BAGGAGE_PROPAGATOR.extract(headers, current_context)
        _BAGGAGE_PROPAGATOR.inject(
            headers,
            baggage.set_baggage(WITH_BATCH, True, context=current_context),
        )

    if self._current_span and self._current_span.is_recording():
        current_context = trace.set_span_in_context(
            self._current_span,
            current_context,
        )
        _TRACE_PROPAGATOR.inject(headers, context=self._origin_context)

    else:
        create_span = self._tracer.start_span(
            name=_create_span_name(destination_name, MessageAction.CREATE),
            kind=trace.SpanKind.PRODUCER,
            attributes=trace_attributes,
        )
        current_context = trace.set_span_in_context(create_span)
        _TRACE_PROPAGATOR.inject(headers, context=current_context)
        create_span.end()

    start_time = time.perf_counter()

    try:
        with self._tracer.start_as_current_span(
            name=_create_span_name(destination_name, MessageAction.PUBLISH),
            kind=trace.SpanKind.PRODUCER,
            attributes=trace_attributes,
            context=current_context,
        ) as span:
            span.set_attribute(
                SpanAttributes.MESSAGING_OPERATION,
                MessageAction.PUBLISH,
            )
            msg.headers = headers
            result = await call_next(msg)

    except Exception as e:
        metrics_attributes[ERROR_TYPE] = type(e).__name__
        raise

    finally:
        duration = time.perf_counter() - start_time
        self._metrics.observe_publish(metrics_attributes, duration, msg_count)

    for key, token in self._scope_tokens:
        self.context.reset_local(key, token)

    return result

consume_scope async #

consume_scope(call_next, msg)
Source code in faststream/opentelemetry/middleware.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[Any]",
) -> Any:
    if (provider := self.__settings_provider) is None:
        return await call_next(msg)

    if _is_batch_message(msg):
        links = _get_msg_links(msg)
        current_context = Context()
    else:
        links = None
        current_context = _TRACE_PROPAGATOR.extract(msg.headers)

    destination_name = provider.get_consume_destination_name(msg)
    trace_attributes = provider.get_consume_attrs_from_message(msg)
    metrics_attributes = {
        SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
        MESSAGING_DESTINATION_PUBLISH_NAME: destination_name,
    }

    if not len(current_context):
        create_span = self._tracer.start_span(
            name=_create_span_name(destination_name, MessageAction.CREATE),
            kind=trace.SpanKind.CONSUMER,
            attributes=trace_attributes,
            links=links,
        )
        current_context = trace.set_span_in_context(create_span)
        create_span.end()

    self._origin_context = current_context
    start_time = time.perf_counter()

    try:
        with self._tracer.start_as_current_span(
            name=_create_span_name(destination_name, MessageAction.PROCESS),
            kind=trace.SpanKind.CONSUMER,
            context=current_context,
            attributes=trace_attributes,
            end_on_exit=False,
        ) as span:
            span.set_attribute(
                SpanAttributes.MESSAGING_OPERATION,
                MessageAction.PROCESS,
            )
            self._current_span = span

            self._scope_tokens.append((
                "span",
                self.context.set_local("span", span),
            ))
            self._scope_tokens.append(
                (
                    "baggage",
                    self.context.set_local("baggage", Baggage.from_msg(msg)),
                ),
            )

            new_context = trace.set_span_in_context(span, current_context)
            token = context.attach(new_context)
            result = await call_next(msg)
            context.detach(token)

    except Exception as e:
        metrics_attributes[ERROR_TYPE] = type(e).__name__
        raise

    finally:
        duration = time.perf_counter() - start_time
        msg_count = trace_attributes.get(
            SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT,
            1,
        )
        self._metrics.observe_consume(metrics_attributes, duration, msg_count)

    return result

after_processed async #

after_processed(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/opentelemetry/middleware.py
async def after_processed(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> bool | None:
    if self._current_span and self._current_span.is_recording():
        self._current_span.end()
    return False

on_receive async #

on_receive()

Hook to call on message receive.

Source code in faststream/_internal/middlewares.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""

on_consume async #

on_consume(msg)

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_consume(
    self,
    msg: "StreamMessage[AnyMsg]",
) -> "StreamMessage[AnyMsg]":
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    return msg

after_consume async #

after_consume(err)

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_consume(self, err: Exception | None) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    if err is not None:
        raise err

on_publish async #

on_publish(msg)

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_publish(
    self,
    msg: PublishCommandType,
) -> PublishCommandType:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    return msg

after_publish async #

after_publish(err)

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_publish(
    self,
    err: Exception | None,
) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    if err is not None:
        raise err