import pathlib import re import subprocess from typing import Any, Dict, Optional from .constants import ( ALL_PATTERNS, APT_AUTO_CFG, APT_UA_BASE, APT_UA_OVERRIDE, DEFAULT_UPDATE_TIME, DEFAULT_UPGRADE_TIME, SECURITY_PATTERNS, ) from .helpers import strip_comments, validate_time def auto_updates_enabled() -> bool: if not APT_AUTO_CFG.exists(): return False text = APT_AUTO_CFG.read_text() return 'APT::Periodic::Unattended-Upgrade "1";' in text def set_auto_updates(enable: bool) -> None: """ Toggle unattended upgrades in a way that matches systemd state, not just the apt config file. Assumes unattended-upgrades is already installed. """ units_maskable = [ "apt-daily.service", "apt-daily-upgrade.service", "apt-daily.timer", "apt-daily-upgrade.timer", "unattended-upgrades.service", ] timers = ["apt-daily.timer", "apt-daily-upgrade.timer"] service = "unattended-upgrades.service" APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True) if enable: APT_AUTO_CFG.write_text( 'APT::Periodic::Update-Package-Lists "1";\n' 'APT::Periodic::Unattended-Upgrade "1";\n' ) for unit in units_maskable: subprocess.run(["systemctl", "unmask", unit], check=False) for unit in timers + [service]: subprocess.run(["systemctl", "enable", unit], check=False) for unit in timers: subprocess.run(["systemctl", "start", unit], check=False) subprocess.run(["systemctl", "start", service], check=False) else: APT_AUTO_CFG.write_text( 'APT::Periodic::Update-Package-Lists "0";\n' 'APT::Periodic::Unattended-Upgrade "0";\n' ) for unit in timers + [service]: subprocess.run(["systemctl", "stop", unit], check=False) subprocess.run(["systemctl", "disable", unit], check=False) for unit in units_maskable: subprocess.run(["systemctl", "mask", unit], check=False) def _systemctl_is(unit: str, verb: str) -> bool: try: out = subprocess.check_output(["systemctl", verb, unit], text=True).strip() return out == "enabled" if verb == "is-enabled" else out == "active" except Exception: return False def auto_updates_state() -> Dict[str, Any]: config_on = auto_updates_enabled() service = "unattended-upgrades.service" timers = ["apt-daily.timer", "apt-daily-upgrade.timer"] state: Dict[str, Any] = { "config_enabled": config_on, "service_enabled": _systemctl_is(service, "is-enabled"), "service_active": _systemctl_is(service, "is-active"), "timers_enabled": {}, "timers_active": {}, } for t in timers: state["timers_enabled"][t] = _systemctl_is(t, "is-enabled") state["timers_active"][t] = _systemctl_is(t, "is-active") state["enabled"] = ( config_on and state["service_enabled"] and all(state["timers_enabled"].values()) ) return state def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False): text = strip_comments(text) pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;' m = re.search(pattern, text) if not m: return default val = m.group(1).strip() if as_bool: return val.lower() in ("1", "true", "yes", "on") if as_int: try: return int(val) except ValueError: return default return val def _parse_origins_patterns(text: str): text = strip_comments(text) m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S) patterns = [] if not m: return patterns body = m.group(1) for line in body.splitlines(): ln = line.strip().strip('";') if ln: patterns.append(ln) return patterns def _read_timer_time(timer: str): try: out = subprocess.check_output( ["systemctl", "show", "--property=TimersCalendar", timer], text=True ) m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out) if m: return f"{int(m.group(1)):02d}:{m.group(2)}" except Exception: pass return None def read_updates_config(state=None) -> Dict[str, Any]: """ Return a normalized unattended-upgrades configuration snapshot. Values are sourced from the Pi-Kit override file when present, else the base file. """ text = "" for path in (APT_UA_OVERRIDE, APT_UA_BASE): if path.exists(): try: text += path.read_text() + "\n" except Exception: pass scope_hint = None m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text) if m_scope: scope_hint = m_scope.group(1).lower() cleaned = strip_comments(text) patterns = _parse_origins_patterns(cleaned) scope = ( scope_hint or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security") ) cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True) auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True) reboot_time = validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME) reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True) bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True) update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME state = state or auto_updates_state() return { "enabled": bool(state.get("enabled", False)), "scope": scope, "cleanup": bool(cleanup), "bandwidth_limit_kbps": bandwidth, "auto_reboot": bool(auto_reboot), "reboot_time": reboot_time, "reboot_with_users": bool(reboot_with_users), "update_time": update_time, "upgrade_time": upgrade_time, "state": state, } def _write_timer_override(timer: str, time_str: str): time_norm = validate_time(time_str, DEFAULT_UPDATE_TIME) override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d") override_dir.mkdir(parents=True, exist_ok=True) override_file = override_dir / "pikit.conf" override_file.write_text( "[Timer]\n" f"OnCalendar=*-*-* {time_norm}\n" "Persistent=true\n" "RandomizedDelaySec=30min\n" ) subprocess.run(["systemctl", "daemon-reload"], check=False) subprocess.run(["systemctl", "restart", timer], check=False) def set_updates_config(opts: Dict[str, Any]) -> Dict[str, Any]: """ Apply unattended-upgrades configuration from dashboard inputs. """ enable = bool(opts.get("enable", True)) scope = opts.get("scope") or "all" patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS cleanup = bool(opts.get("cleanup", False)) bandwidth = opts.get("bandwidth_limit_kbps") auto_reboot = bool(opts.get("auto_reboot", False)) reboot_time = validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME) reboot_with_users = bool(opts.get("reboot_with_users", False)) update_time = validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME) upgrade_time = validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME) APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True) set_auto_updates(enable) lines = [ "// Managed by Pi-Kit dashboard", f"// PIKIT_SCOPE: {scope}", "Unattended-Upgrade::Origins-Pattern {", ] for p in patterns: lines.append(f' "{p}";') lines.append("};") lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};') lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};') lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";') lines.append( f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};' ) if bandwidth is not None: lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";') APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True) APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n") _write_timer_override("apt-daily.timer", update_time) _write_timer_override("apt-daily-upgrade.timer", upgrade_time) return read_updates_config()