Files
pi-kit/pikit_api/diagnostics.py
2025-12-13 17:04:32 -05:00

118 lines
3.5 KiB
Python

import datetime
import io
import json
import pathlib
from typing import Any, Dict, List, Optional
from .constants import (
API_LOG,
DEBUG_FLAG,
DIAG_DEFAULT_STATE,
DIAG_LOG_FILE,
DIAG_MAX_BYTES,
DIAG_MAX_ENTRY_CHARS,
DIAG_STATE_FILE,
)
_diag_state: Optional[Dict[str, Any]] = None
def _load_diag_state() -> Dict[str, Any]:
"""Load diagnostics state from RAM-backed storage when available."""
global _diag_state
if _diag_state is not None:
return _diag_state
try:
if DIAG_STATE_FILE.exists():
_diag_state = json.loads(DIAG_STATE_FILE.read_text())
return _diag_state
except Exception:
pass
_diag_state = DIAG_DEFAULT_STATE.copy()
return _diag_state
def _save_diag_state(enabled=None, level=None) -> Dict[str, Any]:
"""Persist diagnostics state; tolerate failures silently."""
state = _load_diag_state()
if enabled is not None:
state["enabled"] = bool(enabled)
if level in ("normal", "debug"):
state["level"] = level
try:
DIAG_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
DIAG_STATE_FILE.write_text(json.dumps(state))
except Exception:
pass
return state
def diag_log(level: str, message: str, meta: Optional[dict] = None) -> None:
"""
Append a diagnostic log line to RAM-backed file.
Skips when disabled or when debug level is off.
"""
state = _load_diag_state()
if not state.get("enabled"):
return
if level == "debug" and state.get("level") != "debug":
return
try:
ts = datetime.datetime.utcnow().isoformat() + "Z"
entry = {"ts": ts, "level": level, "msg": message}
if meta:
entry["meta"] = meta
line = json.dumps(entry, separators=(",", ":"))
if len(line) > DIAG_MAX_ENTRY_CHARS:
entry.pop("meta", None)
entry["msg"] = (message or "")[: DIAG_MAX_ENTRY_CHARS - 40] + ""
line = json.dumps(entry, separators=(",", ":"))
line_bytes = (line + "\n").encode()
DIAG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with DIAG_LOG_FILE.open("ab") as f:
f.write(line_bytes)
if DIAG_LOG_FILE.stat().st_size > DIAG_MAX_BYTES:
with DIAG_LOG_FILE.open("rb") as f:
f.seek(-DIAG_MAX_BYTES, io.SEEK_END)
tail = f.read()
if b"\n" in tail:
tail = tail.split(b"\n", 1)[1]
with DIAG_LOG_FILE.open("wb") as f:
f.write(tail)
except Exception:
pass
def diag_read(limit: int = 500) -> List[dict]:
"""Return latest log entries (dicts), newest first."""
if not DIAG_LOG_FILE.exists():
return []
try:
data = DIAG_LOG_FILE.read_bytes()
except Exception:
return []
lines = data.splitlines()[-limit:]
out: List[dict] = []
for line in lines:
try:
out.append(json.loads(line.decode("utf-8", errors="ignore")))
except Exception:
continue
return out[::-1]
def dbg(msg: str) -> None:
"""
Lightweight debug logger for legacy /boot/pikit-debug flag.
Mirrors into diagnostics log when enabled.
"""
if DEBUG_FLAG:
API_LOG.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.utcnow().isoformat()
with API_LOG.open("a") as f:
f.write(f"[{ts}] {msg}\n")
try:
diag_log("debug", msg)
except Exception:
pass