async def consume_scope(
self,
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
if (provider := self.__settings_provider) is None:
return await call_next(msg)
if _is_batch_message(msg):
links = _get_msg_links(msg)
current_context = Context()
else:
links = None
current_context = _TRACE_PROPAGATOR.extract(msg.headers)
destination_name = provider.get_consume_destination_name(msg)
trace_attributes = provider.get_consume_attrs_from_message(msg)
metrics_attributes = {
SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
MESSAGING_DESTINATION_PUBLISH_NAME: destination_name,
}
if not len(current_context):
create_span = self._tracer.start_span(
name=_create_span_name(destination_name, MessageAction.CREATE),
kind=trace.SpanKind.CONSUMER,
attributes=trace_attributes,
links=links,
)
current_context = trace.set_span_in_context(create_span)
create_span.end()
self._origin_context = current_context
start_time = time.perf_counter()
try:
with self._tracer.start_as_current_span(
name=_create_span_name(destination_name, MessageAction.PROCESS),
kind=trace.SpanKind.CONSUMER,
context=current_context,
attributes=trace_attributes,
end_on_exit=False,
) as span:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION,
MessageAction.PROCESS,
)
self._current_span = span
self._scope_tokens.append((
"span",
self.context.set_local("span", span),
))
self._scope_tokens.append(
(
"baggage",
self.context.set_local("baggage", Baggage.from_msg(msg)),
),
)
new_context = trace.set_span_in_context(span, current_context)
token = context.attach(new_context)
result = await call_next(msg)
context.detach(token)
except Exception as e:
metrics_attributes[ERROR_TYPE] = type(e).__name__
raise
finally:
duration = time.perf_counter() - start_time
msg_count = trace_attributes.get(
SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT,
1,
)
self._metrics.observe_consume(metrics_attributes, duration, msg_count)
return result