Bases: DefaultLoggerStorage
Source code in faststream/nats/broker/logging.py
| def __init__(self) -> None:
super().__init__()
self._max_queue_len = 0
self._max_stream_len = 0
self._max_subject_len = 4
self.logger_log_level = logging.INFO
|
logger_log_level
instance-attribute
set_level
Source code in faststream/nats/broker/logging.py
| def set_level(self, level: int) -> None:
self.logger_log_level = level
|
register_subscriber
register_subscriber(params)
Source code in faststream/nats/broker/logging.py
| def register_subscriber(self, params: dict[str, Any]) -> None:
self._max_subject_len = max(
(
self._max_subject_len,
len(params.get("subject", "")),
),
)
self._max_queue_len = max(
(
self._max_queue_len,
len(params.get("queue", "")),
),
)
self._max_stream_len = max(
(
self._max_stream_len,
len(params.get("stream", "")),
),
)
|
get_logger
Source code in faststream/nats/broker/logging.py
| def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
# TODO: generate unique logger names to not share between brokers
if not (lg := self._get_logger_ref()):
message_id_ln = 10
lg = get_broker_logger(
name="nats",
default_context={
"subject": "",
"stream": "",
"queue": "",
},
message_id_ln=message_id_ln,
fmt="".join((
"%(asctime)s %(levelname)-8s - ",
(
f"%(stream)-{self._max_stream_len}s | "
if self._max_stream_len
else ""
),
(
f"%(queue)-{self._max_queue_len}s | "
if self._max_queue_len
else ""
),
f"%(subject)-{self._max_subject_len}s | ",
f"%(message_id)-{message_id_ln}s - ",
"%(message)s",
)),
context=context,
log_level=self.logger_log_level,
)
self._logger_ref.add(lg)
return lg
|