Skip to content

MetricsManager

faststream.prometheus.manager.MetricsManager #

MetricsManager(
    container: MetricsContainer,
    *,
    app_name: str = "faststream",
)
Source code in faststream/prometheus/manager.py
def __init__(
    self, container: MetricsContainer, *, app_name: str = "faststream"
) -> None:
    self._container = container
    self._app_name = app_name

add_received_message #

add_received_message(
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_received_message(
    self,
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None:
    self._container.received_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        **custom_labels,
    ).inc(amount)

observe_received_messages_size #

observe_received_messages_size(
    broker: str,
    handler: str,
    size: int,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def observe_received_messages_size(
    self,
    broker: str,
    handler: str,
    size: int,
    **custom_labels: str,
) -> None:
    self._container.received_messages_size_bytes.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        **custom_labels,
    ).observe(size)

add_received_message_in_process #

add_received_message_in_process(
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_received_message_in_process(
    self,
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None:
    self._container.received_messages_in_process.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        **custom_labels,
    ).inc(amount)

remove_received_message_in_process #

remove_received_message_in_process(
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def remove_received_message_in_process(
    self,
    broker: str,
    handler: str,
    amount: int = 1,
    **custom_labels: str,
) -> None:
    self._container.received_messages_in_process.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        **custom_labels,
    ).dec(amount)

add_received_processed_message #

add_received_processed_message(
    broker: str,
    handler: str,
    status: ProcessingStatus,
    amount: int = 1,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_received_processed_message(
    self,
    broker: str,
    handler: str,
    status: ProcessingStatus,
    amount: int = 1,
    **custom_labels: str,
) -> None:
    self._container.received_processed_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        status=status.value,
        **custom_labels,
    ).inc(amount)

observe_received_processed_message_duration #

observe_received_processed_message_duration(
    duration: float,
    broker: str,
    handler: str,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def observe_received_processed_message_duration(
    self,
    duration: float,
    broker: str,
    handler: str,
    **custom_labels: str,
) -> None:
    self._container.received_processed_messages_duration_seconds.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        **custom_labels,
    ).observe(duration)

add_received_processed_message_exception #

add_received_processed_message_exception(
    broker: str,
    handler: str,
    exception_type: str,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_received_processed_message_exception(
    self,
    broker: str,
    handler: str,
    exception_type: str,
    **custom_labels: str,
) -> None:
    self._container.received_processed_messages_exceptions_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        exception_type=exception_type,
        **custom_labels,
    ).inc()

add_published_message #

add_published_message(
    broker: str,
    destination: str,
    status: PublishingStatus,
    amount: int = 1,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_published_message(
    self,
    broker: str,
    destination: str,
    status: PublishingStatus,
    amount: int = 1,
    **custom_labels: str,
) -> None:
    self._container.published_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
        status=status.value,
        **custom_labels,
    ).inc(amount)

observe_published_message_duration #

observe_published_message_duration(
    duration: float,
    broker: str,
    destination: str,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def observe_published_message_duration(
    self,
    duration: float,
    broker: str,
    destination: str,
    **custom_labels: str,
) -> None:
    self._container.published_messages_duration_seconds.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
        **custom_labels,
    ).observe(duration)

add_published_message_exception #

add_published_message_exception(
    broker: str,
    destination: str,
    exception_type: str,
    **custom_labels: str,
) -> None
Source code in faststream/prometheus/manager.py
def add_published_message_exception(
    self,
    broker: str,
    destination: str,
    exception_type: str,
    **custom_labels: str,
) -> None:
    self._container.published_messages_exceptions_total.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
        exception_type=exception_type,
        **custom_labels,
    ).inc()