import os import pathlib import shutil import socket import subprocess from typing import Any, Dict, List from .auto_updates import auto_updates_state, read_updates_config from .constants import CORE_NAME, CORE_PORTS, READY_FILE from .helpers import default_host, detect_https, normalize_path, port_online, reboot_required from .services import load_services, ufw_status_allows def _cpu_temp(): for path in ("/sys/class/thermal/thermal_zone0/temp",): p = pathlib.Path(path) if p.exists(): try: return float(p.read_text().strip()) / 1000.0 except Exception: continue return None def _os_version(): os_ver = "DietPi" try: for line in pathlib.Path("/etc/os-release").read_text().splitlines(): if line.startswith("PRETTY_NAME="): os_ver = line.split("=", 1)[1].strip().strip('"') break except Exception: pass return os_ver def _lan_ip(): try: out = subprocess.check_output(["hostname", "-I"], text=True).strip() return out.split()[0] if out else None except Exception: return None def list_services_with_health(): services = [] for svc in load_services(): svc = dict(svc) port = svc.get("port") if port: svc["online"] = port_online("127.0.0.1", port) svc["firewall_open"] = ufw_status_allows(port) services.append(svc) return services def list_services_for_ui(): services = [] for svc in load_services(): svc = dict(svc) port = svc.get("port") if port: svc["online"] = port_online("127.0.0.1", port) svc["firewall_open"] = ufw_status_allows(port) host = default_host() path = normalize_path(svc.get("path")) scheme = svc.get("scheme") or ("https" if detect_https(host, port) else "http") svc["scheme"] = scheme svc["url"] = f"{scheme}://{host}:{port}{path}" services.append(svc) return services def collect_status() -> Dict[str, Any]: uptime = float(open("/proc/uptime").read().split()[0]) load1, load5, load15 = os.getloadavg() meminfo = {} for ln in open("/proc/meminfo"): k, v = ln.split(":", 1) meminfo[k] = int(v.strip().split()[0]) total = meminfo.get("MemTotal", 0) // 1024 free = meminfo.get("MemAvailable", 0) // 1024 disk = shutil.disk_usage("/") services = list_services_with_health() updates_state = auto_updates_state() updates_config = read_updates_config(updates_state) data = { "hostname": socket.gethostname(), "uptime_seconds": uptime, "load": [load1, load5, load15], "memory_mb": {"total": total, "free": free}, "disk_mb": {"total": disk.total // 1024 // 1024, "free": disk.free // 1024 // 1024}, "cpu_temp_c": _cpu_temp(), "lan_ip": _lan_ip(), "os_version": _os_version(), "auto_updates_enabled": updates_state.get("enabled", False), "auto_updates": updates_state, "updates_config": updates_config, "reboot_required": reboot_required(), "ready": READY_FILE.exists(), "services": services, } return data