Skip to content

KafkaParamsStorage

faststream.kafka.broker.logging.KafkaParamsStorage #

KafkaParamsStorage()

Bases: DefaultLoggerStorage

Source code in faststream/kafka/broker/logging.py
def __init__(self) -> None:
    super().__init__()

    self._max_topic_len = 4
    self._max_group_len = 0

    self.logger_log_level = logging.INFO

logger_log_level instance-attribute #

logger_log_level = INFO

set_level #

set_level(level)
Source code in faststream/kafka/broker/logging.py
def set_level(self, level: int) -> None:
    self.logger_log_level = level

register_subscriber #

register_subscriber(params)
Source code in faststream/kafka/broker/logging.py
def register_subscriber(self, params: dict[str, Any]) -> None:
    self._max_topic_len = max(
        (
            self._max_topic_len,
            len(params.get("topic", "")),
        ),
    )
    self._max_group_len = max(
        (
            self._max_group_len,
            len(params.get("group_id", "")),
        ),
    )

get_logger #

get_logger(*, context)
Source code in faststream/kafka/broker/logging.py
def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
    message_id_ln = 10

    # TODO: generate unique logger names to not share between brokers
    if not (lg := self._get_logger_ref()):
        lg = get_broker_logger(
            name="kafka",
            default_context={
                "topic": "",
                "group_id": "",
            },
            message_id_ln=message_id_ln,
            fmt="".join((
                "%(asctime)s %(levelname)-8s - ",
                f"%(topic)-{self._max_topic_len}s | ",
                (
                    f"%(group_id)-{self._max_group_len}s | "
                    if self._max_group_len
                    else ""
                ),
                f"%(message_id)-{message_id_ln}s ",
                "- %(message)s",
            )),
            context=context,
            log_level=self.logger_log_level,
        )
        self._logger_ref.add(lg)

    return lg