Files
pi-kit/pikit_api/status.py
2025-12-13 17:04:32 -05:00

104 lines
3.2 KiB
Python

import os
import pathlib
import shutil
import socket
import subprocess
from typing import Any, Dict, List
from .auto_updates import auto_updates_state, read_updates_config
from .constants import CORE_NAME, CORE_PORTS, READY_FILE
from .helpers import default_host, detect_https, normalize_path, port_online, reboot_required
from .services import load_services, ufw_status_allows
def _cpu_temp():
for path in ("/sys/class/thermal/thermal_zone0/temp",):
p = pathlib.Path(path)
if p.exists():
try:
return float(p.read_text().strip()) / 1000.0
except Exception:
continue
return None
def _os_version():
os_ver = "DietPi"
try:
for line in pathlib.Path("/etc/os-release").read_text().splitlines():
if line.startswith("PRETTY_NAME="):
os_ver = line.split("=", 1)[1].strip().strip('"')
break
except Exception:
pass
return os_ver
def _lan_ip():
try:
out = subprocess.check_output(["hostname", "-I"], text=True).strip()
return out.split()[0] if out else None
except Exception:
return None
def list_services_with_health():
services = []
for svc in load_services():
svc = dict(svc)
port = svc.get("port")
if port:
svc["online"] = port_online("127.0.0.1", port)
svc["firewall_open"] = ufw_status_allows(port)
services.append(svc)
return services
def list_services_for_ui():
services = []
for svc in load_services():
svc = dict(svc)
port = svc.get("port")
if port:
svc["online"] = port_online("127.0.0.1", port)
svc["firewall_open"] = ufw_status_allows(port)
host = default_host()
path = normalize_path(svc.get("path"))
scheme = svc.get("scheme") or ("https" if detect_https(host, port) else "http")
svc["scheme"] = scheme
svc["url"] = f"{scheme}://{host}:{port}{path}"
services.append(svc)
return services
def collect_status() -> Dict[str, Any]:
uptime = float(open("/proc/uptime").read().split()[0])
load1, load5, load15 = os.getloadavg()
meminfo = {}
for ln in open("/proc/meminfo"):
k, v = ln.split(":", 1)
meminfo[k] = int(v.strip().split()[0])
total = meminfo.get("MemTotal", 0) // 1024
free = meminfo.get("MemAvailable", 0) // 1024
disk = shutil.disk_usage("/")
services = list_services_with_health()
updates_state = auto_updates_state()
updates_config = read_updates_config(updates_state)
data = {
"hostname": socket.gethostname(),
"uptime_seconds": uptime,
"load": [load1, load5, load15],
"memory_mb": {"total": total, "free": free},
"disk_mb": {"total": disk.total // 1024 // 1024, "free": disk.free // 1024 // 1024},
"cpu_temp_c": _cpu_temp(),
"lan_ip": _lan_ip(),
"os_version": _os_version(),
"auto_updates_enabled": updates_state.get("enabled", False),
"auto_updates": updates_state,
"updates_config": updates_config,
"reboot_required": reboot_required(),
"ready": READY_FILE.exists(),
"services": services,
}
return data