diff --git a/Cargo.lock b/Cargo.lock index 42771e6..3da4c0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -743,6 +749,12 @@ version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -1126,6 +1138,19 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.38" @@ -1356,6 +1381,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "2.0.18" @@ -1624,11 +1662,13 @@ dependencies = [ "axum", "chrono", "clap", + "libc", "md5", "reqwest", "roxmltree", "serde", "serde_json", + "tempfile", "thiserror", "tokio", "toml", diff --git a/Cargo.toml b/Cargo.toml index 5d8193e..a44a97b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ anyhow = "1.0" axum = "0.8" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5", features = ["derive"] } +libc = "0.2" md5 = "0.7" reqwest = { version = "0.12", default-features = false, features = ["charset", "http2", "json", "multipart", "rustls-tls"] } roxmltree = "0.20" @@ -20,3 +21,6 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } urlencoding = "2.1" uuid = { version = "1.0", features = ["serde", "v4"] } + +[dev-dependencies] +tempfile = "3.23" diff --git a/PROJECT_MAP.md b/PROJECT_MAP.md index 0170e35..f8c0d4a 100644 --- a/PROJECT_MAP.md +++ b/PROJECT_MAP.md @@ -19,7 +19,7 @@ script and control smart TVs through a stable, brand-agnostic API. ## Project Status -**Phase:** Milestone 2 complete. Roku adapter is live-validated; daemon and CLI wiring are next. +**Phase:** Milestone 4 in progress. Daemon core is complete; CLI coverage is expanding beyond daemon/device/app-cache operations. **Platform v1:** Roku only (via ECP HTTP API) **Language:** Rust **Crate type:** Binary (single binary distribution target) @@ -49,6 +49,8 @@ tvctl/ │ │ └── mod.rs │ ├── daemon/ ← tvctld daemon core scaffolding │ │ ├── mod.rs +│ │ ├── config.rs ← Config loading and runtime path helpers +│ │ ├── ipc.rs ← Unix socket request/response protocol │ │ ├── registry.rs ← Device registry │ │ ├── discovery.rs ← SSDP discovery service │ │ ├── cache.rs ← App cache manager @@ -335,9 +337,9 @@ enabled = true ## What Has NOT Been Started -- Daemon runtime, socket transport, and persistence logic - HTTP route handlers and request validation -- Real CLI command handling beyond skeleton parsing +- CLI command handling for remote/state/dev/config flows +- Daemon install/uninstall helpers and systemd unit generation - CI/CD configuration - Release/packaging diff --git a/ROADMAP.md b/ROADMAP.md index 167d41b..9468b18 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -7,14 +7,15 @@ ## Current Focus -**Milestone 3 — Daemon Core** -Roku adapter work is complete. Begin daemon runtime and persistence wiring. +**Milestone 4 — CLI** +Daemon core is complete. Continue broadening CLI coverage against the running daemon. --- ## In Progress -_Nothing in progress right now._ +- Milestone 4 is underway with working `daemon`, `device`, and app-cache commands over the Unix socket +- Remaining CLI work is remote/state/dev/config coverage plus daemon install/uninstall helpers --- @@ -47,35 +48,35 @@ _Goal: Can communicate with a real Roku TV over ECP._ ## Milestone 3 — Daemon Core _Goal: tvctld runs, manages devices, and handles the Unix socket._ -- [ ] Daemon entry point and lifecycle (`src/daemon/mod.rs`) -- [ ] Unix socket listener -- [ ] Device registry (`src/daemon/registry.rs`) - - Load from `devices.json` on start - - Persist on change - - CRUD operations -- [ ] Discovery service (`src/daemon/discovery.rs`) - - SSDP scan - - Auto-discover on startup (if configured) - - Interval-based re-scan - - Manual add by IP -- [ ] App cache manager (`src/daemon/cache.rs`) - - Per-platform JSON files - - Organic growth strategy - - `app refresh` invalidation -- [ ] State cache (`src/daemon/state.rs`) +- [x] 2026-04-14 — Daemon entry point and lifecycle (`src/daemon/mod.rs`) +- [x] 2026-04-14 — Unix socket listener +- [x] 2026-04-14 — Device registry (`src/daemon/registry.rs`) + - [x] Load from `devices.json` on start + - [x] Persist on change + - [x] CRUD operations +- [x] 2026-04-14 — Discovery service (`src/daemon/discovery.rs`) + - [x] SSDP scan + - [x] Auto-discover on startup (if configured) + - [x] Interval-based re-scan + - [x] Manual add by IP +- [x] 2026-04-14 — App cache manager (`src/daemon/cache.rs`) + - [x] Per-platform JSON files + - [x] Organic growth strategy + - [x] `app refresh` invalidation +- [x] 2026-04-14 — State cache (`src/daemon/state.rs`) - In-memory only - Per-device last-known state - Timestamp on every entry -- [ ] Adapter registry (map platform string → adapter instance) -- [ ] Config loading from TOML +- [x] 2026-04-14 — Adapter registry (map platform string → adapter instance) +- [x] 2026-04-14 — Config loading from TOML --- ## Milestone 4 — CLI _Goal: All tvctl commands work against a running daemon._ -- [ ] CLI entry point and dispatch (`src/cli/mod.rs`) -- [ ] Unix socket client (send commands, receive responses) +- [x] 2026-04-14 — CLI entry point and dispatch (`src/cli/mod.rs`) +- [x] 2026-04-14 — Unix socket client (send commands, receive responses) - [ ] `tvctl daemon` commands - `start` `stop` `restart` `status` - `install` (generate systemd user unit) @@ -158,6 +159,9 @@ out of scope until Milestone 6 is complete and stable. - [x] 2026-04-14 — Add Roku ECP discovery, input, app, and state adapter support with unit tests - [x] 2026-04-14 — Add env-gated live Roku integration tests and validate against the Hisense TV on the LAN - [x] 2026-04-14 — Implement Roku developer-mode install/log support and validate sideloading on the Hisense TV +- [x] 2026-04-14 — Add daemon config loading, runtime paths, persisted registry/cache stores, and discovery foundations +- [x] 2026-04-14 — Add daemon Unix socket IPC plus working `daemon` and `device` lifecycle/discovery commands +- [x] 2026-04-14 — Finish Milestone 3 with registry CRUD, periodic discovery, manual add, and app-cache refresh plus live daemon validation --- diff --git a/src/adapters/roku/mod.rs b/src/adapters/roku/mod.rs index ae1c5c9..1eb5856 100644 --- a/src/adapters/roku/mod.rs +++ b/src/adapters/roku/mod.rs @@ -133,15 +133,19 @@ impl RokuAdapter { Ok(()) } - fn device_base_url(device: &Device) -> Result { - let host = match device.address { + fn base_url_for(address: IpAddr, port: u16) -> Result { + let host = match address { IpAddr::V4(address) => address.to_string(), IpAddr::V6(address) => format!("[{address}]"), }; - Url::parse(&format!("http://{host}:{}/", device.port)) + Url::parse(&format!("http://{host}:{port}/")) .map_err(|error| TvError::Transport(format!("invalid device URL: {error}"))) } + fn device_base_url(device: &Device) -> Result { + Self::base_url_for(device.address, device.port) + } + fn join_url(base_url: &Url, path: &str) -> Result { base_url .join(path) @@ -172,6 +176,18 @@ impl RokuAdapter { parse_device_info(&xml) } + /// Probe a Roku device at a known address and port. + pub async fn probe_device(&self, address: IpAddr, port: u16) -> Result { + let base_url = Self::base_url_for(address, port)?; + let info = self.fetch_device_info(&base_url).await?; + Ok(DeviceInfo { + name: info.display_name(), + platform: "roku".to_string(), + address, + port, + }) + } + fn dev_base_url(device: &Device) -> Result { let host = match device.address { IpAddr::V4(address) => address.to_string(), diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 348779d..1b74553 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,4 +1,29 @@ -use clap::{Parser, Subcommand}; +use std::{net::IpAddr, path::PathBuf, process::Stdio, time::Duration}; + +use clap::{Args, CommandFactory, Parser, Subcommand}; +use serde::Serialize; +use thiserror::Error; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::UnixStream, + process::Command as TokioCommand, + time::sleep, +}; + +use crate::{ + adapters::{AppInfo, Device}, + daemon::{ + self, + config::{RuntimePaths, TvctlConfig}, + ipc::{ + AppListResult, AppRefreshResult, DaemonRequest, DaemonResponse, DaemonStatus, + DiscoveryResult, + }, + }, +}; + +const DAEMON_START_WAIT_ATTEMPTS: usize = 20; +const DAEMON_START_WAIT_INTERVAL: Duration = Duration::from_millis(250); /// The tvctl command-line interface. #[derive(Debug, Parser)] @@ -22,14 +47,23 @@ pub struct Cli { } /// The top-level resource namespaces exposed by tvctl. -#[derive(Debug, Subcommand)] +#[derive(Debug, Clone, Subcommand)] pub enum Command { /// Manage the background daemon. - Daemon, + Daemon { + #[command(subcommand)] + command: DaemonCommand, + }, /// Discover and manage devices. - Device, - /// List, launch, and stop applications. - App, + Device { + #[command(subcommand)] + command: DeviceCommand, + }, + /// List and refresh application metadata. + App { + #[command(subcommand)] + command: AppCommand, + }, /// Send remote control input. Remote, /// Query device state. @@ -38,10 +72,515 @@ pub enum Command { Dev, /// Inspect and modify tvctl configuration. Config, + /// Internal daemon entry point used by `tvctl daemon start`. + #[command(hide = true, name = "__daemon_serve")] + InternalDaemonServe, } -/// Parse the CLI and return successfully for the repository scaffold. -pub async fn run() -> anyhow::Result<()> { - let _ = Cli::parse(); +/// Manage the tvctld lifecycle. +#[derive(Debug, Clone, Subcommand)] +pub enum DaemonCommand { + /// Start the background daemon process. + Start, + /// Stop the running daemon process. + Stop, + /// Restart the running daemon process. + Restart, + /// Show whether the daemon is running. + Status, +} + +/// Discover and inspect known devices. +#[derive(Debug, Clone, Subcommand)] +pub enum DeviceCommand { + /// List devices currently known to the daemon. + List, + /// Trigger a fresh discovery scan. + Discover, + /// Manually add a device by probing its address. + Add(DeviceAddArgs), + /// Show one known device by name or UUID. + Info { + /// The friendly name or UUID to inspect. + target: String, + }, + /// Remove one known device by name or UUID. + Remove { + /// The friendly name or UUID to remove. + target: String, + }, + /// Mark one known device as the default target. + Select { + /// The friendly name or UUID to make default. + target: String, + }, +} + +/// Arguments for `tvctl device add`. +#[derive(Debug, Clone, Args)] +pub struct DeviceAddArgs { + /// The normalized platform identifier, currently `roku`. + #[arg(long)] + pub platform: String, + + /// The device IP address. + #[arg(long)] + pub address: IpAddr, + + /// Optional platform port override. + #[arg(long)] + pub port: Option, + + /// Optional user-assigned friendly name. + #[arg(long)] + pub name: Option, +} + +/// App-cache commands. +#[derive(Debug, Clone, Subcommand)] +pub enum AppCommand { + /// List cached apps for the selected or default device platform. + List, + /// Refresh the cached app list from the selected or default device. + Refresh { + /// Clear the platform cache before reloading from the device. + #[arg(long)] + clear: bool, + }, +} + +/// A user-facing CLI error with a suggested next action. +#[derive(Debug, Error)] +#[error("{message}\nHint: {hint}")] +pub struct CliError { + /// The human-readable error message. + pub message: String, + /// The suggested next action. + pub hint: String, +} + +impl CliError { + fn new(message: impl Into, hint: impl Into) -> Self { + Self { + message: message.into(), + hint: hint.into(), + } + } +} + +/// Parse the CLI and execute the selected command. +pub async fn run() -> Result<(), CliError> { + let cli = Cli::parse(); + let Some(command) = cli.command.clone() else { + let mut cmd = Cli::command(); + cmd.print_long_help().map_err(|error| { + CliError::new( + format!("Failed to render CLI help: {error}"), + "Retry the command or inspect the terminal output.", + ) + })?; + println!(); + return Ok(()); + }; + + match command { + Command::InternalDaemonServe => daemon::serve().await.map_err(|error| { + CliError::new(error.to_string(), "Inspect the daemon logs and retry.") + }), + Command::Daemon { command } => handle_daemon_command(&cli, command).await, + Command::Device { command } => handle_device_command(&cli, command).await, + Command::App { command } => handle_app_command(&cli, command).await, + Command::Remote => Err(CliError::new( + "Remote commands are not wired to the daemon yet.", + "Continue Milestone 4 after the daemon protocol is in place.", + )), + Command::State => Err(CliError::new( + "State queries are not wired to the daemon yet.", + "Continue Milestone 4 after the daemon protocol is in place.", + )), + Command::Dev => Err(CliError::new( + "Developer commands are not wired to the daemon yet.", + "Continue Milestone 4 after the daemon protocol is in place.", + )), + Command::Config => Err(CliError::new( + "Config commands are not wired to the daemon yet.", + "Continue Milestone 4 after the daemon protocol is in place.", + )), + } +} + +async fn handle_daemon_command(cli: &Cli, command: DaemonCommand) -> Result<(), CliError> { + match command { + DaemonCommand::Start => daemon_start(cli).await, + DaemonCommand::Stop => daemon_stop(cli).await, + DaemonCommand::Restart => { + if daemon_status_payload().await.is_some() { + daemon_stop(cli).await?; + } + daemon_start(cli).await + } + DaemonCommand::Status => daemon_status(cli).await, + } +} + +async fn handle_device_command(cli: &Cli, command: DeviceCommand) -> Result<(), CliError> { + match command { + DeviceCommand::List => { + let response = + send_request(load_socket_path().await?, &DaemonRequest::ListDevices).await?; + let devices: Vec = parse_response_data(response)?; + render(cli, &devices, || render_device_list(&devices)) + } + DeviceCommand::Discover => { + let response = + send_request(load_socket_path().await?, &DaemonRequest::Discover).await?; + let result: DiscoveryResult = parse_response_data(response)?; + render(cli, &result, || render_discovery_result(&result.devices)) + } + DeviceCommand::Add(args) => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::AddDevice { + platform: args.platform, + address: args.address, + port: args.port, + name: args.name, + }, + ) + .await?; + let device: Device = parse_response_data(response)?; + render(cli, &device, || { + format!( + "Added {} [{}] {}:{}{}", + device.name, + device.platform, + device.address, + device.port, + default_marker(&device) + ) + }) + } + DeviceCommand::Info { target } => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::GetDevice { target }, + ) + .await?; + let device: Device = parse_response_data(response)?; + render(cli, &device, || render_device_info(&device)) + } + DeviceCommand::Remove { target } => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::RemoveDevice { target }, + ) + .await?; + let device: Device = parse_response_data(response)?; + render(cli, &device, || format!("Removed {}.", device.name)) + } + DeviceCommand::Select { target } => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::SelectDevice { target }, + ) + .await?; + let device: Device = parse_response_data(response)?; + render(cli, &device, || { + format!("Default device set to {}.", device.name) + }) + } + } +} + +async fn handle_app_command(cli: &Cli, command: AppCommand) -> Result<(), CliError> { + match command { + AppCommand::List => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::ListApps { + device: cli.device.clone(), + platform: None, + }, + ) + .await?; + let result: AppListResult = parse_response_data(response)?; + render(cli, &result, || { + render_app_list(&result.apps, &result.platform) + }) + } + AppCommand::Refresh { clear } => { + let response = send_request( + load_socket_path().await?, + &DaemonRequest::RefreshApps { + device: cli.device.clone(), + clear, + }, + ) + .await?; + let result: AppRefreshResult = parse_response_data(response)?; + render(cli, &result, || render_app_refresh(&result)) + } + } +} + +async fn daemon_start(cli: &Cli) -> Result<(), CliError> { + if let Some(status) = daemon_status_payload().await { + return render(cli, &status, || { + format!("tvctld is already running on {}", status.socket) + }); + } + + let exe = std::env::current_exe().map_err(|error| { + CliError::new( + format!("Unable to locate the tvctl binary: {error}"), + "Run the command from an installed or built tvctl executable.", + ) + })?; + + TokioCommand::new(exe) + .arg("__daemon_serve") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|error| { + CliError::new( + format!("Failed to launch tvctld: {error}"), + "Check filesystem permissions and try again.", + ) + })?; + + for _ in 0..DAEMON_START_WAIT_ATTEMPTS { + if let Some(status) = daemon_status_payload().await { + return render(cli, &status, || { + format!("tvctld started on {}", status.socket) + }); + } + sleep(DAEMON_START_WAIT_INTERVAL).await; + } + + Err(CliError::new( + "tvctld did not become ready in time.", + "Check whether the socket path is writable and retry `tvctl daemon start`.", + )) +} + +async fn daemon_stop(cli: &Cli) -> Result<(), CliError> { + let socket = load_socket_path().await?; + let response = send_request(socket, &DaemonRequest::Shutdown).await?; + let data: serde_json::Value = parse_response_data(response)?; + render(cli, &data, || "tvctld stopped.".to_string()) +} + +async fn daemon_status(cli: &Cli) -> Result<(), CliError> { + if let Some(status) = daemon_status_payload().await { + return render(cli, &status, || { + format!( + "tvctld is running on {} with {} known device(s).", + status.socket, status.device_count + ) + }); + } + + if cli.json { + let status = serde_json::json!({ + "running": false, + }); + return render(cli, &status, || "tvctld is not running.".to_string()); + } + + println!("tvctld is not running."); Ok(()) } + +async fn daemon_status_payload() -> Option { + let socket = load_socket_path().await.ok()?; + let response = send_request(socket, &DaemonRequest::Ping).await.ok()?; + parse_response_data(response).ok() +} + +async fn load_socket_path() -> Result { + let config = TvctlConfig::load().await.map_err(|error| { + CliError::new( + format!("Failed to load tvctl configuration: {error}"), + "Inspect ~/.config/tvctl/config.toml for invalid TOML.", + ) + })?; + let fallback = RuntimePaths::detect().socket_file; + let configured = PathBuf::from(config.daemon.socket); + Ok(if configured.as_os_str().is_empty() { + fallback + } else { + configured + }) +} + +async fn send_request( + socket_path: PathBuf, + request: &DaemonRequest, +) -> Result { + let mut stream = UnixStream::connect(&socket_path).await.map_err(|error| { + CliError::new( + format!( + "Unable to reach tvctld at {}: {error}", + socket_path.display() + ), + "Run `tvctl daemon start` first.", + ) + })?; + + let bytes = serde_json::to_vec(request).map_err(|error| { + CliError::new( + format!("Failed to serialize daemon request: {error}"), + "Inspect the CLI build and retry.", + ) + })?; + stream.write_all(&bytes).await.map_err(|error| { + CliError::new( + format!("Failed to write request to tvctld: {error}"), + "Check the daemon socket permissions and retry.", + ) + })?; + stream.shutdown().await.map_err(|error| { + CliError::new( + format!("Failed to finish the daemon request: {error}"), + "Retry the command after restarting the daemon.", + ) + })?; + + let mut response_bytes = Vec::new(); + stream + .read_to_end(&mut response_bytes) + .await + .map_err(|error| { + CliError::new( + format!("Failed to read the daemon response: {error}"), + "Retry the command after restarting the daemon.", + ) + })?; + + let response = serde_json::from_slice::(&response_bytes).map_err(|error| { + CliError::new( + format!("Failed to decode the daemon response: {error}"), + "Ensure the CLI and daemon are from the same build.", + ) + })?; + + if let Some(error) = &response.error { + return Err(CliError::new( + error.message.clone(), + error.hint.clone().unwrap_or_else(|| { + "Retry the command after checking the daemon state.".to_string() + }), + )); + } + + Ok(response) +} + +fn parse_response_data(response: DaemonResponse) -> Result +where + T: serde::de::DeserializeOwned, +{ + let data = response.data.ok_or_else(|| { + CliError::new( + "The daemon response did not include data.", + "Ensure the CLI and daemon are from the same build.", + ) + })?; + serde_json::from_value(data).map_err(|error| { + CliError::new( + format!("Failed to decode daemon payload: {error}"), + "Ensure the CLI and daemon are from the same build.", + ) + }) +} + +fn render(cli: &Cli, data: &T, human: impl FnOnce() -> String) -> Result<(), CliError> +where + T: Serialize, +{ + if cli.json { + let json = serde_json::to_string_pretty(data).map_err(|error| { + CliError::new( + format!("Failed to encode JSON output: {error}"), + "Retry the command or inspect the output type.", + ) + })?; + println!("{json}"); + } else { + println!("{}", human()); + } + Ok(()) +} + +fn render_device_list(devices: &[Device]) -> String { + if devices.is_empty() { + return "No devices are registered yet.".to_string(); + } + + devices + .iter() + .map(|device| { + format!( + "{} [{}] {}:{}{}", + device.name, + device.platform, + device.address, + device.port, + default_marker(device) + ) + }) + .collect::>() + .join("\n") +} + +fn render_discovery_result(devices: &[Device]) -> String { + if devices.is_empty() { + return "Discovery completed, but no devices were found.".to_string(); + } + format!( + "Discovered {} device(s).\n{}", + devices.len(), + render_device_list(devices) + ) +} + +fn render_device_info(device: &Device) -> String { + [ + format!("Name: {}", device.name), + format!("Original Name: {}", device.original_name), + format!("UUID: {}", device.id), + format!("Platform: {}", device.platform), + format!("Address: {}:{}", device.address, device.port), + format!("Default: {}", device.is_default), + format!("Discovered At: {}", device.discovered_at), + format!("Last Seen: {}", device.last_seen), + ] + .join("\n") +} + +fn render_app_list(apps: &[AppInfo], platform: &str) -> String { + if apps.is_empty() { + return format!("No cached apps are known yet for platform {platform}."); + } + + format!( + "Cached apps for {platform}:\n{}", + apps.iter() + .map(|app| format!("{} [{}]", app.name, app.platform_id)) + .collect::>() + .join("\n") + ) +} + +fn render_app_refresh(result: &AppRefreshResult) -> String { + format!( + "Refreshed {} app(s) from {}. {} cached app(s) are now known for {}.", + result.refreshed_count, result.device.name, result.cached_count, result.platform + ) +} + +fn default_marker(device: &Device) -> &'static str { + if device.is_default { " (default)" } else { "" } +} diff --git a/src/daemon/cache.rs b/src/daemon/cache.rs index 15c88cb..647fd35 100644 --- a/src/daemon/cache.rs +++ b/src/daemon/cache.rs @@ -1,3 +1,7 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use tokio::fs; + use crate::adapters::AppInfo; /// A platform-level cache of app metadata discovered from live devices. @@ -8,3 +12,184 @@ pub struct AppCache { /// The apps currently known for that platform. pub apps: Vec, } + +/// Persisted store for per-platform app cache files. +#[derive(Debug, Clone)] +pub struct AppCacheStore { + root_dir: PathBuf, +} + +impl AppCacheStore { + /// Create a cache store rooted at the given directory. + pub fn new(root_dir: PathBuf) -> Self { + Self { root_dir } + } + + /// Load the app cache for a single platform or return an empty cache when absent. + pub async fn load_platform(&self, platform: &str) -> anyhow::Result { + let path = self.platform_path(platform); + let apps = match fs::read_to_string(&path).await { + Ok(contents) => serde_json::from_str::>(&contents)?, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Vec::new(), + Err(error) => return Err(error.into()), + }; + + Ok(AppCache { + platform: platform.to_string(), + apps, + }) + } + + /// Persist the app list for a platform. + pub async fn save_platform(&self, platform: &str, apps: &[AppInfo]) -> anyhow::Result<()> { + fs::create_dir_all(&self.root_dir).await?; + let contents = serde_json::to_string_pretty(apps)?; + fs::write(self.platform_path(platform), contents).await?; + Ok(()) + } + + /// Merge newly seen apps into the normalized, de-duplicated platform cache. + pub async fn record_platform_apps( + &self, + platform: &str, + apps: Vec, + ) -> anyhow::Result { + let mut deduped = BTreeMap::new(); + for app in self.load_platform(platform).await?.apps { + deduped.insert(app.platform_id.clone(), app); + } + for app in apps { + deduped.insert(app.platform_id.clone(), app); + } + let apps: Vec = deduped.into_values().collect(); + self.save_platform(platform, &apps).await?; + Ok(AppCache { + platform: platform.to_string(), + apps, + }) + } + + /// Resolve an app by case-insensitive name or exact ID from the persisted platform cache. + pub async fn find_app(&self, platform: &str, query: &str) -> anyhow::Result> { + let cache = self.load_platform(platform).await?; + let normalized = query.to_ascii_lowercase(); + Ok(cache.apps.into_iter().find(|app| { + app.platform_id == query + || app.id == query + || app.name.to_ascii_lowercase() == normalized + })) + } + + /// Remove the persisted app cache for a platform. + pub async fn clear_platform(&self, platform: &str) -> anyhow::Result<()> { + match fs::remove_file(self.platform_path(platform)).await { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } + } + + fn platform_path(&self, platform: &str) -> PathBuf { + self.root_dir.join(format!("{platform}.apps.json")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn cache_round_trips_and_resolves_names() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let store = AppCacheStore::new(temp_dir.path().join("cache")); + let apps = vec![ + AppInfo { + id: "12".to_string(), + name: "Netflix".to_string(), + version: None, + platform_id: "12".to_string(), + }, + AppInfo { + id: "837".to_string(), + name: "YouTube".to_string(), + version: None, + platform_id: "837".to_string(), + }, + ]; + + store + .record_platform_apps("roku", apps.clone()) + .await + .expect("apps should save"); + + let loaded = store.load_platform("roku").await.expect("apps should load"); + assert_eq!(loaded.apps, apps); + + let resolved = store + .find_app("roku", "youtube") + .await + .expect("app lookup should work") + .expect("youtube should exist"); + assert_eq!(resolved.platform_id, "837"); + } + + #[tokio::test] + async fn record_platform_apps_merges_existing_entries() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let store = AppCacheStore::new(temp_dir.path().join("cache")); + + store + .record_platform_apps( + "roku", + vec![AppInfo { + id: "12".to_string(), + name: "Netflix".to_string(), + version: None, + platform_id: "12".to_string(), + }], + ) + .await + .expect("first cache should save"); + store + .record_platform_apps( + "roku", + vec![AppInfo { + id: "837".to_string(), + name: "YouTube".to_string(), + version: None, + platform_id: "837".to_string(), + }], + ) + .await + .expect("second cache should merge"); + + let loaded = store.load_platform("roku").await.expect("apps should load"); + assert_eq!(loaded.apps.len(), 2); + } + + #[tokio::test] + async fn clear_platform_removes_persisted_cache() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let store = AppCacheStore::new(temp_dir.path().join("cache")); + + store + .save_platform( + "roku", + &[AppInfo { + id: "12".to_string(), + name: "Netflix".to_string(), + version: None, + platform_id: "12".to_string(), + }], + ) + .await + .expect("apps should save"); + store + .clear_platform("roku") + .await + .expect("cache should clear"); + + let loaded = store.load_platform("roku").await.expect("apps should load"); + assert!(loaded.apps.is_empty()); + } +} diff --git a/src/daemon/config.rs b/src/daemon/config.rs new file mode 100644 index 0000000..257786c --- /dev/null +++ b/src/daemon/config.rs @@ -0,0 +1,235 @@ +use std::{ + env, + path::{Path, PathBuf}, +}; + +use serde::{Deserialize, Serialize}; +use tokio::fs; + +/// The complete daemon configuration loaded from TOML. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct TvctlConfig { + /// Runtime daemon settings. + pub daemon: DaemonConfig, + /// Discovery scan behavior. + pub discovery: DiscoveryConfig, + /// Default-device settings. + pub devices: DeviceConfig, + /// Developer tooling toggles. + pub dev: DevConfig, +} + +impl Default for TvctlConfig { + fn default() -> Self { + Self { + daemon: DaemonConfig::default(), + discovery: DiscoveryConfig::default(), + devices: DeviceConfig::default(), + dev: DevConfig::default(), + } + } +} + +impl TvctlConfig { + /// Load configuration from the default XDG path or return defaults when absent. + pub async fn load() -> anyhow::Result { + Self::load_from_path(&default_config_path()).await + } + + /// Load configuration from a specific path or return defaults when absent. + pub async fn load_from_path(path: &Path) -> anyhow::Result { + match fs::read_to_string(path).await { + Ok(contents) => Ok(toml::from_str::(&contents)?), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()), + Err(error) => Err(error.into()), + } + } + + /// Persist configuration to a specific path, creating parent directories first. + pub async fn save_to_path(&self, path: &Path) -> anyhow::Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + let contents = toml::to_string_pretty(self)?; + fs::write(path, contents).await?; + Ok(()) + } +} + +/// Runtime daemon settings. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct DaemonConfig { + /// Unix socket path for CLI communication. + pub socket: String, + /// Whether the HTTP API is enabled. + pub http_enabled: bool, + /// Loopback HTTP port. + pub http_port: u16, + /// Loopback host or bind address. + pub http_host: String, + /// Logging level. + pub log_level: String, +} + +impl Default for DaemonConfig { + fn default() -> Self { + Self { + socket: default_socket_path().to_string_lossy().into_owned(), + http_enabled: true, + http_port: 7272, + http_host: "127.0.0.1".to_string(), + log_level: "info".to_string(), + } + } +} + +/// Discovery scan behavior. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct DiscoveryConfig { + /// Whether discovery runs automatically on daemon start. + pub auto_discover: bool, + /// How often discovery repeats, in seconds. + pub interval_secs: u64, + /// Per-device timeout in seconds. + pub timeout_secs: u64, +} + +impl Default for DiscoveryConfig { + fn default() -> Self { + Self { + auto_discover: true, + interval_secs: 300, + timeout_secs: 5, + } + } +} + +/// Default-device settings. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(default)] +pub struct DeviceConfig { + /// The default device name or UUID. + pub default: String, +} + +/// Developer tooling toggles. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct DevConfig { + /// Whether developer tooling is enabled. + pub enabled: bool, +} + +impl Default for DevConfig { + fn default() -> Self { + Self { enabled: true } + } +} + +/// Canonical runtime paths used by the daemon. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RuntimePaths { + /// The configuration file path. + pub config_file: PathBuf, + /// The data directory root. + pub data_dir: PathBuf, + /// The device registry path. + pub devices_file: PathBuf, + /// The platform app cache directory. + pub cache_dir: PathBuf, + /// The runtime socket path. + pub socket_file: PathBuf, +} + +impl RuntimePaths { + /// Build the canonical path set from XDG defaults. + pub fn detect() -> Self { + let config_file = default_config_path(); + let data_dir = default_data_dir(); + let cache_dir = data_dir.join("cache"); + Self { + config_file, + devices_file: data_dir.join("devices.json"), + socket_file: default_socket_path(), + data_dir, + cache_dir, + } + } +} + +/// Return the default config file path. +pub fn default_config_path() -> PathBuf { + default_config_dir().join("config.toml") +} + +/// Return the default config directory path. +pub fn default_config_dir() -> PathBuf { + if let Ok(path) = env::var("XDG_CONFIG_HOME") { + return PathBuf::from(path).join("tvctl"); + } + home_dir().join(".config/tvctl") +} + +/// Return the default data directory path. +pub fn default_data_dir() -> PathBuf { + if let Ok(path) = env::var("XDG_DATA_HOME") { + return PathBuf::from(path).join("tvctl"); + } + home_dir().join(".local/share/tvctl") +} + +/// Return the default runtime socket path. +pub fn default_socket_path() -> PathBuf { + let runtime_dir = env::var("XDG_RUNTIME_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(format!("/run/user/{}", current_uid()))); + runtime_dir.join("tvctl.sock") +} + +fn home_dir() -> PathBuf { + env::var("HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/tmp")) +} + +fn current_uid() -> u32 { + // SAFETY: geteuid reads process state and has no side effects. + unsafe { libc::geteuid() } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn missing_config_uses_defaults() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let config = TvctlConfig::load_from_path(&temp_dir.path().join("missing.toml")) + .await + .expect("default config should load"); + assert_eq!(config, TvctlConfig::default()); + } + + #[tokio::test] + async fn config_round_trips() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let path = temp_dir.path().join("config.toml"); + let config = TvctlConfig { + devices: DeviceConfig { + default: "living-room".to_string(), + }, + ..TvctlConfig::default() + }; + config + .save_to_path(&path) + .await + .expect("config should save"); + let loaded = TvctlConfig::load_from_path(&path) + .await + .expect("config should load"); + assert_eq!(loaded, config); + } +} diff --git a/src/daemon/discovery.rs b/src/daemon/discovery.rs index 4edbbb5..b215f0d 100644 --- a/src/daemon/discovery.rs +++ b/src/daemon/discovery.rs @@ -1,3 +1,41 @@ +use anyhow::Context; + +use crate::adapters::Device; + +use super::registry::{AdapterRegistry, DeviceRegistry}; + /// Background discovery orchestration for supported TV platforms. -#[derive(Debug, Clone, Default)] -pub struct DiscoveryService; +#[derive(Debug, Clone)] +pub struct DiscoveryService { + adapters: AdapterRegistry, +} + +impl DiscoveryService { + /// Create a discovery service over the registered adapters. + pub fn new(adapters: AdapterRegistry) -> Self { + Self { adapters } + } + + /// Discover all supported platforms and merge them into the registry. + pub async fn discover_all(&self, registry: &mut DeviceRegistry) -> anyhow::Result> { + let mut discovered = Vec::new(); + for platform in self.adapters.supported_platforms() { + let mut devices = self + .discover_platform(platform, registry) + .await + .with_context(|| format!("failed discovery for platform '{platform}'"))?; + discovered.append(&mut devices); + } + Ok(discovered) + } + + /// Discover one platform and merge the results into the registry. + pub async fn discover_platform( + &self, + platform: &str, + registry: &mut DeviceRegistry, + ) -> anyhow::Result> { + let discovered = self.adapters.discover(platform).await?; + Ok(registry.merge_discovered(discovered)) + } +} diff --git a/src/daemon/ipc.rs b/src/daemon/ipc.rs new file mode 100644 index 0000000..3bc8d45 --- /dev/null +++ b/src/daemon/ipc.rs @@ -0,0 +1,150 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::net::IpAddr; + +use crate::adapters::{AppInfo, Device}; + +/// A request sent from the CLI to the daemon over the Unix socket. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "command", rename_all = "snake_case")] +pub enum DaemonRequest { + /// Check whether the daemon is alive. + Ping, + /// Ask the daemon to shut down cleanly. + Shutdown, + /// Trigger discovery across supported platforms. + Discover, + /// List all known devices in the registry. + ListDevices, + /// Return one known device by UUID or friendly name. + GetDevice { + /// The UUID or friendly name to resolve. + target: String, + }, + /// Manually add a device by probing the provided address. + AddDevice { + /// The normalized platform identifier. + platform: String, + /// The device IP address. + address: IpAddr, + /// Optional platform port override. + port: Option, + /// Optional user-assigned friendly name. + name: Option, + }, + /// Remove one known device by UUID or friendly name. + RemoveDevice { + /// The UUID or friendly name to remove. + target: String, + }, + /// Mark one known device as the default target. + SelectDevice { + /// The UUID or friendly name to select. + target: String, + }, + /// Return cached apps for a platform or target device. + ListApps { + /// Optional UUID or friendly name. + device: Option, + /// Optional platform override when no device is provided. + platform: Option, + }, + /// Refresh cached apps for one device. + RefreshApps { + /// Optional UUID or friendly name. + device: Option, + /// Whether to clear the platform cache before reloading from the device. + clear: bool, + }, +} + +/// A standard daemon response envelope for IPC. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct DaemonResponse { + /// Whether the request succeeded. + pub ok: bool, + /// Success payload data. + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + /// Structured error payload. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl DaemonResponse { + /// Construct a success response with JSON payload. + pub fn success(data: T) -> Self { + Self { + ok: true, + data: Some(serde_json::to_value(data).unwrap_or(Value::Null)), + error: None, + } + } + + /// Construct an error response. + pub fn error(code: &str, message: impl Into, hint: impl Into>) -> Self { + Self { + ok: false, + data: None, + error: Some(DaemonError { + code: code.to_string(), + message: message.into(), + hint: hint.into(), + }), + } + } +} + +/// A stable IPC error payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DaemonError { + /// Stable machine-readable error code. + pub code: String, + /// Human-readable error message. + pub message: String, + /// Suggested next action. + #[serde(skip_serializing_if = "Option::is_none")] + pub hint: Option, +} + +/// A daemon health/status payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DaemonStatus { + /// The daemon process ID. + pub pid: u32, + /// The socket path being served. + pub socket: String, + /// Whether the HTTP API is enabled. + pub http_enabled: bool, + /// The number of known devices. + pub device_count: usize, +} + +/// Discovery results returned by the daemon. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DiscoveryResult { + /// The devices discovered during this scan. + pub devices: Vec, +} + +/// Cached app-list payload returned by the daemon. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AppListResult { + /// The platform the cache belongs to. + pub platform: String, + /// The currently cached apps for that platform. + pub apps: Vec, +} + +/// App refresh results returned by the daemon. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AppRefreshResult { + /// The device used for the refresh. + pub device: Device, + /// The platform cache that was updated. + pub platform: String, + /// The number of apps returned by the live device. + pub refreshed_count: usize, + /// The total number of cached apps after merge/replace. + pub cached_count: usize, +} diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index b961b7d..5bf803d 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -1,8 +1,522 @@ pub mod cache; +pub mod config; pub mod discovery; +pub mod ipc; pub mod registry; pub mod state; +use std::{ + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; + +use cache::AppCacheStore; +use config::{RuntimePaths, TvctlConfig}; +use discovery::DiscoveryService; +use ipc::{ + AppListResult, AppRefreshResult, DaemonRequest, DaemonResponse, DaemonStatus, DiscoveryResult, +}; +use registry::{AdapterRegistry, DeviceRegistry}; +use state::StateCache; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncWriteExt}, + net::{UnixListener, UnixStream}, + sync::Mutex, + time::{self, MissedTickBehavior}, +}; +use tracing::warn; + +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; + +use crate::adapters::Device; + /// The long-lived tvctld process. -#[derive(Debug, Default)] -pub struct Daemon; +#[derive(Debug)] +pub struct Daemon { + /// The loaded daemon configuration. + pub config: TvctlConfig, + /// The canonical runtime path set. + pub paths: RuntimePaths, + /// Persisted known devices. + pub registry: DeviceRegistry, + /// Persisted platform app metadata. + pub app_cache: AppCacheStore, + /// In-memory state snapshots. + pub state_cache: StateCache, + /// Available platform adapters. + pub adapters: AdapterRegistry, + /// Discovery orchestration over registered adapters. + pub discovery: DiscoveryService, +} + +impl Daemon { + /// Load the daemon's persisted state and adapter registry. + pub async fn load() -> anyhow::Result { + let config = TvctlConfig::load().await?; + let mut paths = RuntimePaths::detect(); + if !config.daemon.socket.is_empty() { + paths.socket_file = PathBuf::from(&config.daemon.socket); + } + let adapters = AdapterRegistry::default(); + let mut registry = DeviceRegistry::load(paths.devices_file.clone()).await?; + if !config.devices.default.is_empty() { + let _ = registry.set_default(&config.devices.default); + } else { + registry.ensure_default(); + } + let app_cache = AppCacheStore::new(paths.cache_dir.clone()); + let discovery = DiscoveryService::new(adapters.clone()); + + Ok(Self { + config, + paths, + registry, + app_cache, + state_cache: StateCache::default(), + adapters, + discovery, + }) + } +} + +/// Run the long-lived daemon loop over a Unix socket. +pub async fn serve() -> anyhow::Result<()> { + let daemon = Arc::new(Mutex::new(Daemon::load().await?)); + + { + let mut guard = daemon.lock().await; + if guard.config.discovery.auto_discover { + run_discovery(&mut guard).await?; + } + } + + let (socket_path, interval_secs) = { + let guard = daemon.lock().await; + ( + guard.paths.socket_file.clone(), + guard.config.discovery.interval_secs, + ) + }; + + if let Some(parent) = socket_path.parent() { + fs::create_dir_all(parent).await?; + } + + let _ = fs::remove_file(&socket_path).await; + let listener = UnixListener::bind(&socket_path)?; + set_socket_permissions(&socket_path).await?; + let mut discovery_interval = discovery_interval(interval_secs); + if let Some(interval) = discovery_interval.as_mut() { + interval.tick().await; + } + + loop { + if let Some(interval) = discovery_interval.as_mut() { + tokio::select! { + _ = interval.tick() => { + let mut guard = daemon.lock().await; + if let Err(error) = run_discovery(&mut guard).await { + warn!("Periodic discovery failed: {error}"); + } + } + accepted = listener.accept() => { + let (stream, _) = accepted?; + let should_stop = handle_connection(stream, daemon.clone()).await?; + if should_stop { + break; + } + } + } + } else { + let (stream, _) = listener.accept().await?; + let should_stop = handle_connection(stream, daemon.clone()).await?; + if should_stop { + break; + } + } + } + + let _ = fs::remove_file(&socket_path).await; + Ok(()) +} + +async fn set_socket_permissions(path: &Path) -> anyhow::Result<()> { + #[cfg(unix)] + { + fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).await?; + } + Ok(()) +} + +async fn handle_connection( + mut stream: UnixStream, + daemon: Arc>, +) -> anyhow::Result { + let mut request_bytes = Vec::new(); + stream.read_to_end(&mut request_bytes).await?; + + let request = match serde_json::from_slice::(&request_bytes) { + Ok(request) => request, + Err(error) => { + let response = DaemonResponse::error( + "invalid_request", + format!("Invalid daemon request: {error}"), + Some("Upgrade the CLI or inspect the daemon socket protocol.".to_string()), + ); + write_response(&mut stream, &response).await?; + return Ok(false); + } + }; + + let (response, should_stop) = handle_request(request, daemon).await; + write_response(&mut stream, &response).await?; + Ok(should_stop) +} + +async fn write_response(stream: &mut UnixStream, response: &DaemonResponse) -> anyhow::Result<()> { + let bytes = serde_json::to_vec(response)?; + stream.write_all(&bytes).await?; + stream.shutdown().await?; + Ok(()) +} + +async fn handle_request( + request: DaemonRequest, + daemon: Arc>, +) -> (DaemonResponse, bool) { + match request { + DaemonRequest::Ping => { + let guard = daemon.lock().await; + ( + DaemonResponse::success(DaemonStatus { + pid: std::process::id(), + socket: guard.paths.socket_file.display().to_string(), + http_enabled: guard.config.daemon.http_enabled, + device_count: guard.registry.devices.len(), + }), + false, + ) + } + DaemonRequest::Shutdown => ( + DaemonResponse::success(serde_json::json!({ + "message": "Daemon shutdown requested." + })), + true, + ), + DaemonRequest::Discover => { + let mut guard = daemon.lock().await; + match run_discovery(&mut guard).await { + Ok(devices) => (DaemonResponse::success(DiscoveryResult { devices }), false), + Err(error) => ( + DaemonResponse::error( + "discovery_failed", + format!("Device discovery failed: {error}"), + Some( + "Verify SSDP works on this network or add the device manually." + .to_string(), + ), + ), + false, + ), + } + } + DaemonRequest::ListDevices => { + let guard = daemon.lock().await; + ( + DaemonResponse::success(guard.registry.devices.clone()), + false, + ) + } + DaemonRequest::GetDevice { target } => { + let guard = daemon.lock().await; + match guard.registry.find(&target).cloned() { + Some(device) => (DaemonResponse::success(device), false), + None => ( + DaemonResponse::error( + "device_not_found", + format!("Device '{target}' is not in the registry."), + Some( + "Run `tvctl device list` or `tvctl device discover` first.".to_string(), + ), + ), + false, + ), + } + } + DaemonRequest::AddDevice { + platform, + address, + port, + name, + } => { + let mut guard = daemon.lock().await; + match guard.adapters.probe_manual(&platform, address, port).await { + Ok(info) => { + let device = guard.registry.merge_manual(info, name); + if let Err(error) = sync_registry_config(&mut guard).await { + return ( + DaemonResponse::error( + "registry_save_failed", + error, + Some( + "Check permissions for the tvctl config and data directories." + .to_string(), + ), + ), + false, + ); + } + (DaemonResponse::success(device), false) + } + Err(error) => ( + DaemonResponse::error( + "manual_add_failed", + format!( + "Could not add {platform} device at {address}: {}", + error.root_cause() + ), + Some( + "Verify the platform, IP, and port, then make sure the TV is reachable." + .to_string(), + ), + ), + false, + ), + } + } + DaemonRequest::RemoveDevice { target } => { + let mut guard = daemon.lock().await; + match guard.registry.remove(&target) { + Some(device) => { + if let Err(error) = sync_registry_config(&mut guard).await { + return ( + DaemonResponse::error( + "registry_save_failed", + error, + Some( + "Check permissions for the tvctl config and data directories." + .to_string(), + ), + ), + false, + ); + } + (DaemonResponse::success(device), false) + } + None => ( + DaemonResponse::error( + "device_not_found", + format!("Device '{target}' is not in the registry."), + Some( + "Run `tvctl device list` to confirm the device name or UUID." + .to_string(), + ), + ), + false, + ), + } + } + DaemonRequest::SelectDevice { target } => { + let mut guard = daemon.lock().await; + match guard.registry.set_default(&target) { + Some(device) => { + guard.config.devices.default = device.id.to_string(); + if let Err(error) = sync_registry_config(&mut guard).await { + return ( + DaemonResponse::error( + "config_save_failed", + error, + Some( + "Check permissions for the tvctl config and data directories." + .to_string(), + ), + ), + false, + ); + } + (DaemonResponse::success(device), false) + } + None => ( + DaemonResponse::error( + "device_not_found", + format!("Device '{target}' is not in the registry."), + Some( + "Run `tvctl device list` to confirm the device name or UUID." + .to_string(), + ), + ), + false, + ), + } + } + DaemonRequest::ListApps { device, platform } => { + let guard = daemon.lock().await; + let platform = match (device.as_deref(), platform) { + (Some(target), platform) => { + let device = match resolve_target_device(&guard.registry, Some(target)) { + Ok(device) => device, + Err(response) => return (response, false), + }; + let platform = platform.unwrap_or_else(|| device.platform.clone()); + if platform != device.platform { + return ( + DaemonResponse::error( + "platform_mismatch", + format!( + "Requested platform '{platform}' does not match device platform '{}'.", + device.platform + ), + Some("Omit `--platform` or choose a device on the requested platform.".to_string()), + ), + false, + ); + } + platform + } + (None, Some(platform)) => platform, + (None, None) => match resolve_target_device(&guard.registry, None) { + Ok(device) => device.platform, + Err(response) => return (response, false), + }, + }; + match guard.app_cache.load_platform(&platform).await { + Ok(cache) => ( + DaemonResponse::success(AppListResult { + platform, + apps: cache.apps, + }), + false, + ), + Err(error) => ( + DaemonResponse::error( + "app_cache_load_failed", + format!("Failed to load the cached app list: {error}"), + Some("Refresh the app cache or check filesystem permissions.".to_string()), + ), + false, + ), + } + } + DaemonRequest::RefreshApps { device, clear } => { + let guard = daemon.lock().await; + let device = match resolve_target_device(&guard.registry, device.as_deref()) { + Ok(device) => device, + Err(response) => return (response, false), + }; + if clear { + if let Err(error) = guard.app_cache.clear_platform(&device.platform).await { + return ( + DaemonResponse::error( + "app_cache_clear_failed", + format!("Failed to clear the cached app list: {error}"), + Some("Check permissions for ~/.local/share/tvctl/cache.".to_string()), + ), + false, + ); + } + } + match guard.adapters.list_apps(&device).await { + Ok(apps) => match guard + .app_cache + .record_platform_apps(&device.platform, apps.clone()) + .await + { + Ok(cache) => ( + DaemonResponse::success(AppRefreshResult { + device, + platform: cache.platform, + refreshed_count: apps.len(), + cached_count: cache.apps.len(), + }), + false, + ), + Err(error) => ( + DaemonResponse::error( + "app_cache_save_failed", + format!("App refresh succeeded but cache persistence failed: {error}"), + Some("Check permissions for ~/.local/share/tvctl/cache.".to_string()), + ), + false, + ), + }, + Err(error) => ( + DaemonResponse::error( + "app_refresh_failed", + format!("Failed to fetch apps from {}: {error}", device.name), + Some("Verify the TV is online and supports app listing.".to_string()), + ), + false, + ), + } + } + } +} + +async fn run_discovery(daemon: &mut Daemon) -> anyhow::Result> { + let discovery = daemon.discovery.clone(); + let devices = discovery.discover_all(&mut daemon.registry).await?; + if !daemon.config.devices.default.is_empty() { + let _ = daemon.registry.set_default(&daemon.config.devices.default); + } else { + daemon.registry.ensure_default(); + } + sync_registry_config(daemon) + .await + .map_err(anyhow::Error::msg)?; + Ok(devices) +} + +async fn sync_registry_config(daemon: &mut Daemon) -> Result<(), String> { + daemon.registry.ensure_default(); + daemon.config.devices.default = daemon + .registry + .default_device() + .map(|device| device.id.to_string()) + .unwrap_or_default(); + daemon + .registry + .save() + .await + .map_err(|error| format!("Failed to save the device registry: {error}"))?; + daemon + .config + .save_to_path(&daemon.paths.config_file) + .await + .map_err(|error| format!("Failed to save the tvctl config: {error}"))?; + Ok(()) +} + +fn resolve_target_device( + registry: &DeviceRegistry, + target: Option<&str>, +) -> Result { + match target { + Some(target) => registry.find(target).cloned().ok_or_else(|| { + DaemonResponse::error( + "device_not_found", + format!("Device '{target}' is not in the registry."), + Some("Run `tvctl device list` to confirm the device name or UUID.".to_string()), + ) + }), + None => registry.default_device().cloned().ok_or_else(|| { + DaemonResponse::error( + "no_default_device", + "No default device is configured yet.".to_string(), + Some("Run `tvctl device discover` or `tvctl device add`, then `tvctl device select`.".to_string()), + ) + }), + } +} + +fn discovery_interval(interval_secs: u64) -> Option { + if interval_secs == 0 { + return None; + } + + let mut interval = time::interval(Duration::from_secs(interval_secs)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + Some(interval) +} diff --git a/src/daemon/registry.rs b/src/daemon/registry.rs index defc678..04d720c 100644 --- a/src/daemon/registry.rs +++ b/src/daemon/registry.rs @@ -1,8 +1,396 @@ -use crate::adapters::Device; +use std::{net::IpAddr, path::PathBuf}; + +use anyhow::Context; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use tokio::fs; +use uuid::Uuid; + +use crate::adapters::{Device, DeviceInfo, TvAdapter, roku::RokuAdapter}; /// The persisted collection of known devices. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeviceRegistry { + path: PathBuf, /// All devices currently remembered by the daemon. pub devices: Vec, } + +impl DeviceRegistry { + /// Load the registry from disk or return an empty registry when absent. + pub async fn load(path: PathBuf) -> anyhow::Result { + let devices = match fs::read_to_string(&path).await { + Ok(contents) => serde_json::from_str::>(&contents).with_context(|| { + format!("failed to parse device registry at {}", path.display()) + })?, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Vec::new(), + Err(error) => return Err(error.into()), + }; + + Ok(Self { path, devices }) + } + + /// Persist the current registry to disk. + pub async fn save(&self) -> anyhow::Result<()> { + if let Some(parent) = self.path.parent() { + fs::create_dir_all(parent).await?; + } + let contents = serde_json::to_string_pretty(&self.devices)?; + fs::write(&self.path, contents).await?; + Ok(()) + } + + /// Return all known devices. + pub fn list(&self) -> &[Device] { + &self.devices + } + + /// Upsert discovered devices, preserving UUIDs for known entries. + pub fn merge_discovered(&mut self, discovered: Vec) -> Vec { + discovered + .into_iter() + .map(|info| self.upsert_device(info, None)) + .collect() + } + + /// Add or update a manually specified device after it has been probed. + pub fn merge_manual(&mut self, info: DeviceInfo, name: Option) -> Device { + self.upsert_device(info, name) + } + + /// Set the default device by UUID or name. + pub fn set_default(&mut self, target: &str) -> Option { + let selected = self.find(target)?.id; + let mut selected_device = None; + + for device in &mut self.devices { + let is_match = device.id == selected; + device.is_default = is_match; + if is_match { + selected_device = Some(device.clone()); + } + } + + selected_device + } + + /// Find a device by UUID or case-insensitive name. + pub fn find(&self, target: &str) -> Option<&Device> { + let target_uuid = Uuid::parse_str(target).ok(); + let normalized = target.to_ascii_lowercase(); + self.devices.iter().find(|device| { + target_uuid.map(|uuid| device.id == uuid).unwrap_or(false) + || device.name.to_ascii_lowercase() == normalized + }) + } + + /// Return the current default device, if any. + pub fn default_device(&self) -> Option<&Device> { + self.devices.iter().find(|device| device.is_default) + } + + /// Remove a device by UUID or case-insensitive name. + pub fn remove(&mut self, target: &str) -> Option { + let index = self + .devices + .iter() + .position(|device| matches_target(device, target))?; + let removed = self.devices.remove(index); + self.ensure_default(); + Some(removed) + } + + /// Ensure the registry's default marker is valid and singular. + pub fn ensure_default(&mut self) { + if self.devices.is_empty() { + return; + } + + let Some(default_index) = self.devices.iter().position(|device| device.is_default) else { + self.devices[0].is_default = true; + return; + }; + + for (index, device) in self.devices.iter_mut().enumerate() { + device.is_default = index == default_index; + } + } + + fn upsert_device(&mut self, info: DeviceInfo, name: Option) -> Device { + let DeviceInfo { + name: original_name, + platform, + address, + port, + } = info; + let now = Utc::now(); + + if let Some(device) = self.find_platform_address_mut(&platform, address) { + device.port = port; + device.original_name = original_name.clone(); + if let Some(name) = name { + device.name = name; + } + device.last_seen = now; + return device.clone(); + } + + let is_default = self.devices.is_empty(); + let device = Device { + id: Uuid::new_v4(), + name: name.unwrap_or_else(|| original_name.clone()), + original_name, + platform, + address, + port, + is_default, + discovered_at: now, + last_seen: now, + }; + self.devices.push(device.clone()); + self.ensure_default(); + device + } + + fn find_platform_address_mut( + &mut self, + platform: &str, + address: IpAddr, + ) -> Option<&mut Device> { + self.devices + .iter_mut() + .find(|device| device.platform == platform && device.address == address) + } +} + +impl Default for DeviceRegistry { + fn default() -> Self { + Self { + path: PathBuf::from("devices.json"), + devices: Vec::new(), + } + } +} + +/// A registry of platform adapters available to the daemon. +#[derive(Debug, Clone)] +pub struct AdapterRegistry { + roku: RokuAdapter, +} + +impl Default for AdapterRegistry { + fn default() -> Self { + Self { + roku: RokuAdapter::new(), + } + } +} + +impl AdapterRegistry { + /// Return the supported platform names. + pub fn supported_platforms(&self) -> Vec<&'static str> { + vec!["roku"] + } + + /// Discover candidate devices for one platform. + pub async fn discover(&self, platform: &str) -> anyhow::Result> { + match platform { + "roku" => Ok(self.roku.discover().await?), + other => anyhow::bail!("unsupported platform '{other}'"), + } + } + + /// Return true when a platform is supported. + pub fn supports(&self, platform: &str) -> bool { + self.supported_platforms().contains(&platform) + } + + /// Probe a manually specified device to verify it matches the requested platform. + pub async fn probe_manual( + &self, + platform: &str, + address: IpAddr, + port: Option, + ) -> anyhow::Result { + match platform { + "roku" => Ok(self + .roku + .probe_device(address, port.unwrap_or(8060)) + .await?), + other => anyhow::bail!("unsupported platform '{other}'"), + } + } + + /// Return apps from a concrete device using its platform adapter. + pub async fn list_apps( + &self, + device: &Device, + ) -> anyhow::Result> { + match device.platform.as_str() { + "roku" => Ok(self.roku.list_apps(device).await?), + other => anyhow::bail!("unsupported platform '{other}'"), + } + } +} + +fn matches_target(device: &Device, target: &str) -> bool { + let target_uuid = Uuid::parse_str(target).ok(); + let normalized = target.to_ascii_lowercase(); + target_uuid.map(|uuid| device.id == uuid).unwrap_or(false) + || device.name.to_ascii_lowercase() == normalized +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn registry_round_trips_and_preserves_ids() { + let temp_dir = tempfile::tempdir().expect("temp dir should exist"); + let path = temp_dir.path().join("devices.json"); + let mut registry = DeviceRegistry::load(path.clone()) + .await + .expect("registry should load"); + + let first = registry.merge_discovered(vec![DeviceInfo { + name: "Living Room".to_string(), + platform: "roku".to_string(), + address: "10.0.0.5".parse().expect("valid ip"), + port: 8060, + }]); + let first_id = first[0].id; + registry.save().await.expect("registry should save"); + + let mut loaded = DeviceRegistry::load(path) + .await + .expect("registry should reload"); + let second = loaded.merge_discovered(vec![DeviceInfo { + name: "Living Room Roku".to_string(), + platform: "roku".to_string(), + address: "10.0.0.5".parse().expect("valid ip"), + port: 8060, + }]); + + assert_eq!(second[0].id, first_id); + assert_eq!(second[0].original_name, "Living Room Roku"); + } + + #[test] + fn set_default_matches_case_insensitive_names() { + let now = Utc::now(); + let mut registry = DeviceRegistry { + path: PathBuf::from("devices.json"), + devices: vec![ + Device { + id: Uuid::new_v4(), + name: "Living Room".to_string(), + original_name: "Living Room".to_string(), + platform: "roku".to_string(), + address: "10.0.0.5".parse().expect("valid ip"), + port: 8060, + is_default: false, + discovered_at: now, + last_seen: now, + }, + Device { + id: Uuid::new_v4(), + name: "Bedroom".to_string(), + original_name: "Bedroom".to_string(), + platform: "roku".to_string(), + address: "10.0.0.6".parse().expect("valid ip"), + port: 8060, + is_default: false, + discovered_at: now, + last_seen: now, + }, + ], + }; + + let selected = registry + .set_default("living room") + .expect("device should exist"); + assert_eq!(selected.name, "Living Room"); + assert_eq!( + registry.default_device().map(|device| device.name.as_str()), + Some("Living Room") + ); + } + + #[test] + fn merge_manual_updates_existing_device_without_replacing_id() { + let now = Utc::now(); + let id = Uuid::new_v4(); + let mut registry = DeviceRegistry { + path: PathBuf::from("devices.json"), + devices: vec![Device { + id, + name: "Office".to_string(), + original_name: "Office Roku".to_string(), + platform: "roku".to_string(), + address: "10.0.0.9".parse().expect("valid ip"), + port: 8060, + is_default: true, + discovered_at: now, + last_seen: now, + }], + }; + + let merged = registry.merge_manual( + DeviceInfo { + name: "Upstairs Roku".to_string(), + platform: "roku".to_string(), + address: "10.0.0.9".parse().expect("valid ip"), + port: 8061, + }, + Some("Bedroom".to_string()), + ); + + assert_eq!(merged.id, id); + assert_eq!(merged.name, "Bedroom"); + assert_eq!(merged.original_name, "Upstairs Roku"); + assert_eq!(merged.port, 8061); + } + + #[test] + fn remove_promotes_another_default_when_needed() { + let now = Utc::now(); + let living_room_id = Uuid::new_v4(); + let mut registry = DeviceRegistry { + path: PathBuf::from("devices.json"), + devices: vec![ + Device { + id: living_room_id, + name: "Living Room".to_string(), + original_name: "Living Room".to_string(), + platform: "roku".to_string(), + address: "10.0.0.5".parse().expect("valid ip"), + port: 8060, + is_default: true, + discovered_at: now, + last_seen: now, + }, + Device { + id: Uuid::new_v4(), + name: "Bedroom".to_string(), + original_name: "Bedroom".to_string(), + platform: "roku".to_string(), + address: "10.0.0.6".parse().expect("valid ip"), + port: 8060, + is_default: false, + discovered_at: now, + last_seen: now, + }, + ], + }; + + let removed = registry + .remove(&living_room_id.to_string()) + .expect("device should be removed"); + + assert_eq!(removed.name, "Living Room"); + assert_eq!( + registry.default_device().map(|device| device.name.as_str()), + Some("Bedroom") + ); + } +} diff --git a/src/daemon/state.rs b/src/daemon/state.rs index 68bb17e..1ecc280 100644 --- a/src/daemon/state.rs +++ b/src/daemon/state.rs @@ -10,3 +10,15 @@ pub struct StateCache { /// State entries keyed by device UUID. pub entries: HashMap, } + +impl StateCache { + /// Insert or replace the last known state for a device. + pub fn insert(&mut self, state: DeviceState) { + self.entries.insert(state.device_id, state); + } + + /// Read the last known state for a device. + pub fn get(&self, device_id: Uuid) -> Option<&DeviceState> { + self.entries.get(&device_id) + } +} diff --git a/src/main.rs b/src/main.rs index d93769b..57c62e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ /// Launch the tvctl binary entry point. #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::from_default_env() @@ -10,5 +10,8 @@ async fn main() -> anyhow::Result<()> { .compact() .init(); - tvctl::cli::run().await + if let Err(error) = tvctl::cli::run().await { + eprintln!("{error}"); + std::process::exit(1); + } }