118 lines
3.5 KiB
Python
118 lines
3.5 KiB
Python
import datetime
|
|
import io
|
|
import json
|
|
import pathlib
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from .constants import (
|
|
API_LOG,
|
|
DEBUG_FLAG,
|
|
DIAG_DEFAULT_STATE,
|
|
DIAG_LOG_FILE,
|
|
DIAG_MAX_BYTES,
|
|
DIAG_MAX_ENTRY_CHARS,
|
|
DIAG_STATE_FILE,
|
|
)
|
|
|
|
_diag_state: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
def _load_diag_state() -> Dict[str, Any]:
|
|
"""Load diagnostics state from RAM-backed storage when available."""
|
|
global _diag_state
|
|
if _diag_state is not None:
|
|
return _diag_state
|
|
try:
|
|
if DIAG_STATE_FILE.exists():
|
|
_diag_state = json.loads(DIAG_STATE_FILE.read_text())
|
|
return _diag_state
|
|
except Exception:
|
|
pass
|
|
_diag_state = DIAG_DEFAULT_STATE.copy()
|
|
return _diag_state
|
|
|
|
|
|
def _save_diag_state(enabled=None, level=None) -> Dict[str, Any]:
|
|
"""Persist diagnostics state; tolerate failures silently."""
|
|
state = _load_diag_state()
|
|
if enabled is not None:
|
|
state["enabled"] = bool(enabled)
|
|
if level in ("normal", "debug"):
|
|
state["level"] = level
|
|
try:
|
|
DIAG_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
DIAG_STATE_FILE.write_text(json.dumps(state))
|
|
except Exception:
|
|
pass
|
|
return state
|
|
|
|
|
|
def diag_log(level: str, message: str, meta: Optional[dict] = None) -> None:
|
|
"""
|
|
Append a diagnostic log line to RAM-backed file.
|
|
Skips when disabled or when debug level is off.
|
|
"""
|
|
state = _load_diag_state()
|
|
if not state.get("enabled"):
|
|
return
|
|
if level == "debug" and state.get("level") != "debug":
|
|
return
|
|
try:
|
|
ts = datetime.datetime.utcnow().isoformat() + "Z"
|
|
entry = {"ts": ts, "level": level, "msg": message}
|
|
if meta:
|
|
entry["meta"] = meta
|
|
line = json.dumps(entry, separators=(",", ":"))
|
|
if len(line) > DIAG_MAX_ENTRY_CHARS:
|
|
entry.pop("meta", None)
|
|
entry["msg"] = (message or "")[: DIAG_MAX_ENTRY_CHARS - 40] + "…"
|
|
line = json.dumps(entry, separators=(",", ":"))
|
|
line_bytes = (line + "\n").encode()
|
|
DIAG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with DIAG_LOG_FILE.open("ab") as f:
|
|
f.write(line_bytes)
|
|
if DIAG_LOG_FILE.stat().st_size > DIAG_MAX_BYTES:
|
|
with DIAG_LOG_FILE.open("rb") as f:
|
|
f.seek(-DIAG_MAX_BYTES, io.SEEK_END)
|
|
tail = f.read()
|
|
if b"\n" in tail:
|
|
tail = tail.split(b"\n", 1)[1]
|
|
with DIAG_LOG_FILE.open("wb") as f:
|
|
f.write(tail)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def diag_read(limit: int = 500) -> List[dict]:
|
|
"""Return latest log entries (dicts), newest first."""
|
|
if not DIAG_LOG_FILE.exists():
|
|
return []
|
|
try:
|
|
data = DIAG_LOG_FILE.read_bytes()
|
|
except Exception:
|
|
return []
|
|
lines = data.splitlines()[-limit:]
|
|
out: List[dict] = []
|
|
for line in lines:
|
|
try:
|
|
out.append(json.loads(line.decode("utf-8", errors="ignore")))
|
|
except Exception:
|
|
continue
|
|
return out[::-1]
|
|
|
|
|
|
def dbg(msg: str) -> None:
|
|
"""
|
|
Lightweight debug logger for legacy /boot/pikit-debug flag.
|
|
Mirrors into diagnostics log when enabled.
|
|
"""
|
|
if DEBUG_FLAG:
|
|
API_LOG.parent.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.datetime.utcnow().isoformat()
|
|
with API_LOG.open("a") as f:
|
|
f.write(f"[{ts}] {msg}\n")
|
|
try:
|
|
diag_log("debug", msg)
|
|
except Exception:
|
|
pass
|