feat: complete daemon core milestone

Finish Milestone 3 with persisted config, socket IPC, registry CRUD,
periodic discovery, manual add, and app-cache refresh support.
This commit is contained in:
44r0n7
2026-04-14 10:19:14 -04:00
parent 642fa716d1
commit 29e53d16b0
14 changed files with 2176 additions and 46 deletions
Generated
+40
View File
@@ -288,6 +288,12 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "fastrand"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]] [[package]]
name = "find-msvc-tools" name = "find-msvc-tools"
version = "0.1.9" version = "0.1.9"
@@ -743,6 +749,12 @@ version = "0.2.185"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f"
[[package]]
name = "linux-raw-sys"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]] [[package]]
name = "litemap" name = "litemap"
version = "0.8.2" version = "0.8.2"
@@ -1126,6 +1138,19 @@ version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
[[package]]
name = "rustix"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.23.38" version = "0.23.38"
@@ -1356,6 +1381,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tempfile"
version = "3.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "2.0.18" version = "2.0.18"
@@ -1624,11 +1662,13 @@ dependencies = [
"axum", "axum",
"chrono", "chrono",
"clap", "clap",
"libc",
"md5", "md5",
"reqwest", "reqwest",
"roxmltree", "roxmltree",
"serde", "serde",
"serde_json", "serde_json",
"tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
"toml", "toml",
+4
View File
@@ -8,6 +8,7 @@ anyhow = "1.0"
axum = "0.8" axum = "0.8"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5", features = ["derive"] } clap = { version = "4.5", features = ["derive"] }
libc = "0.2"
md5 = "0.7" md5 = "0.7"
reqwest = { version = "0.12", default-features = false, features = ["charset", "http2", "json", "multipart", "rustls-tls"] } reqwest = { version = "0.12", default-features = false, features = ["charset", "http2", "json", "multipart", "rustls-tls"] }
roxmltree = "0.20" roxmltree = "0.20"
@@ -20,3 +21,6 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
urlencoding = "2.1" urlencoding = "2.1"
uuid = { version = "1.0", features = ["serde", "v4"] } uuid = { version = "1.0", features = ["serde", "v4"] }
[dev-dependencies]
tempfile = "3.23"
+5 -3
View File
@@ -19,7 +19,7 @@ script and control smart TVs through a stable, brand-agnostic API.
## Project Status ## Project Status
**Phase:** Milestone 2 complete. Roku adapter is live-validated; daemon and CLI wiring are next. **Phase:** Milestone 4 in progress. Daemon core is complete; CLI coverage is expanding beyond daemon/device/app-cache operations.
**Platform v1:** Roku only (via ECP HTTP API) **Platform v1:** Roku only (via ECP HTTP API)
**Language:** Rust **Language:** Rust
**Crate type:** Binary (single binary distribution target) **Crate type:** Binary (single binary distribution target)
@@ -49,6 +49,8 @@ tvctl/
│ │ └── mod.rs │ │ └── mod.rs
│ ├── daemon/ ← tvctld daemon core scaffolding │ ├── daemon/ ← tvctld daemon core scaffolding
│ │ ├── mod.rs │ │ ├── mod.rs
│ │ ├── config.rs ← Config loading and runtime path helpers
│ │ ├── ipc.rs ← Unix socket request/response protocol
│ │ ├── registry.rs ← Device registry │ │ ├── registry.rs ← Device registry
│ │ ├── discovery.rs ← SSDP discovery service │ │ ├── discovery.rs ← SSDP discovery service
│ │ ├── cache.rs ← App cache manager │ │ ├── cache.rs ← App cache manager
@@ -335,9 +337,9 @@ enabled = true
## What Has NOT Been Started ## What Has NOT Been Started
- Daemon runtime, socket transport, and persistence logic
- HTTP route handlers and request validation - HTTP route handlers and request validation
- Real CLI command handling beyond skeleton parsing - CLI command handling for remote/state/dev/config flows
- Daemon install/uninstall helpers and systemd unit generation
- CI/CD configuration - CI/CD configuration
- Release/packaging - Release/packaging
+27 -23
View File
@@ -7,14 +7,15 @@
## Current Focus ## Current Focus
**Milestone 3Daemon Core** **Milestone 4CLI**
Roku adapter work is complete. Begin daemon runtime and persistence wiring. Daemon core is complete. Continue broadening CLI coverage against the running daemon.
--- ---
## In Progress ## In Progress
_Nothing in progress right now._ - Milestone 4 is underway with working `daemon`, `device`, and app-cache commands over the Unix socket
- Remaining CLI work is remote/state/dev/config coverage plus daemon install/uninstall helpers
--- ---
@@ -47,35 +48,35 @@ _Goal: Can communicate with a real Roku TV over ECP._
## Milestone 3 — Daemon Core ## Milestone 3 — Daemon Core
_Goal: tvctld runs, manages devices, and handles the Unix socket._ _Goal: tvctld runs, manages devices, and handles the Unix socket._
- [ ] Daemon entry point and lifecycle (`src/daemon/mod.rs`) - [x] 2026-04-14 — Daemon entry point and lifecycle (`src/daemon/mod.rs`)
- [ ] Unix socket listener - [x] 2026-04-14 — Unix socket listener
- [ ] Device registry (`src/daemon/registry.rs`) - [x] 2026-04-14 — Device registry (`src/daemon/registry.rs`)
- Load from `devices.json` on start - [x] Load from `devices.json` on start
- Persist on change - [x] Persist on change
- CRUD operations - [x] CRUD operations
- [ ] Discovery service (`src/daemon/discovery.rs`) - [x] 2026-04-14 — Discovery service (`src/daemon/discovery.rs`)
- SSDP scan - [x] SSDP scan
- Auto-discover on startup (if configured) - [x] Auto-discover on startup (if configured)
- Interval-based re-scan - [x] Interval-based re-scan
- Manual add by IP - [x] Manual add by IP
- [ ] App cache manager (`src/daemon/cache.rs`) - [x] 2026-04-14 — App cache manager (`src/daemon/cache.rs`)
- Per-platform JSON files - [x] Per-platform JSON files
- Organic growth strategy - [x] Organic growth strategy
- `app refresh` invalidation - [x] `app refresh` invalidation
- [ ] State cache (`src/daemon/state.rs`) - [x] 2026-04-14 — State cache (`src/daemon/state.rs`)
- In-memory only - In-memory only
- Per-device last-known state - Per-device last-known state
- Timestamp on every entry - Timestamp on every entry
- [ ] Adapter registry (map platform string → adapter instance) - [x] 2026-04-14 — Adapter registry (map platform string → adapter instance)
- [ ] Config loading from TOML - [x] 2026-04-14 — Config loading from TOML
--- ---
## Milestone 4 — CLI ## Milestone 4 — CLI
_Goal: All tvctl commands work against a running daemon._ _Goal: All tvctl commands work against a running daemon._
- [ ] CLI entry point and dispatch (`src/cli/mod.rs`) - [x] 2026-04-14 — CLI entry point and dispatch (`src/cli/mod.rs`)
- [ ] Unix socket client (send commands, receive responses) - [x] 2026-04-14 — Unix socket client (send commands, receive responses)
- [ ] `tvctl daemon` commands - [ ] `tvctl daemon` commands
- `start` `stop` `restart` `status` - `start` `stop` `restart` `status`
- `install` (generate systemd user unit) - `install` (generate systemd user unit)
@@ -158,6 +159,9 @@ out of scope until Milestone 6 is complete and stable.
- [x] 2026-04-14 — Add Roku ECP discovery, input, app, and state adapter support with unit tests - [x] 2026-04-14 — Add Roku ECP discovery, input, app, and state adapter support with unit tests
- [x] 2026-04-14 — Add env-gated live Roku integration tests and validate against the Hisense TV on the LAN - [x] 2026-04-14 — Add env-gated live Roku integration tests and validate against the Hisense TV on the LAN
- [x] 2026-04-14 — Implement Roku developer-mode install/log support and validate sideloading on the Hisense TV - [x] 2026-04-14 — Implement Roku developer-mode install/log support and validate sideloading on the Hisense TV
- [x] 2026-04-14 — Add daemon config loading, runtime paths, persisted registry/cache stores, and discovery foundations
- [x] 2026-04-14 — Add daemon Unix socket IPC plus working `daemon` and `device` lifecycle/discovery commands
- [x] 2026-04-14 — Finish Milestone 3 with registry CRUD, periodic discovery, manual add, and app-cache refresh plus live daemon validation
--- ---
+19 -3
View File
@@ -133,15 +133,19 @@ impl RokuAdapter {
Ok(()) Ok(())
} }
fn device_base_url(device: &Device) -> Result<Url> { fn base_url_for(address: IpAddr, port: u16) -> Result<Url> {
let host = match device.address { let host = match address {
IpAddr::V4(address) => address.to_string(), IpAddr::V4(address) => address.to_string(),
IpAddr::V6(address) => format!("[{address}]"), IpAddr::V6(address) => format!("[{address}]"),
}; };
Url::parse(&format!("http://{host}:{}/", device.port)) Url::parse(&format!("http://{host}:{port}/"))
.map_err(|error| TvError::Transport(format!("invalid device URL: {error}"))) .map_err(|error| TvError::Transport(format!("invalid device URL: {error}")))
} }
fn device_base_url(device: &Device) -> Result<Url> {
Self::base_url_for(device.address, device.port)
}
fn join_url(base_url: &Url, path: &str) -> Result<Url> { fn join_url(base_url: &Url, path: &str) -> Result<Url> {
base_url base_url
.join(path) .join(path)
@@ -172,6 +176,18 @@ impl RokuAdapter {
parse_device_info(&xml) parse_device_info(&xml)
} }
/// Probe a Roku device at a known address and port.
pub async fn probe_device(&self, address: IpAddr, port: u16) -> Result<DeviceInfo> {
let base_url = Self::base_url_for(address, port)?;
let info = self.fetch_device_info(&base_url).await?;
Ok(DeviceInfo {
name: info.display_name(),
platform: "roku".to_string(),
address,
port,
})
}
fn dev_base_url(device: &Device) -> Result<Url> { fn dev_base_url(device: &Device) -> Result<Url> {
let host = match device.address { let host = match device.address {
IpAddr::V4(address) => address.to_string(), IpAddr::V4(address) => address.to_string(),
+548 -9
View File
@@ -1,4 +1,29 @@
use clap::{Parser, Subcommand}; use std::{net::IpAddr, path::PathBuf, process::Stdio, time::Duration};
use clap::{Args, CommandFactory, Parser, Subcommand};
use serde::Serialize;
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
process::Command as TokioCommand,
time::sleep,
};
use crate::{
adapters::{AppInfo, Device},
daemon::{
self,
config::{RuntimePaths, TvctlConfig},
ipc::{
AppListResult, AppRefreshResult, DaemonRequest, DaemonResponse, DaemonStatus,
DiscoveryResult,
},
},
};
const DAEMON_START_WAIT_ATTEMPTS: usize = 20;
const DAEMON_START_WAIT_INTERVAL: Duration = Duration::from_millis(250);
/// The tvctl command-line interface. /// The tvctl command-line interface.
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
@@ -22,14 +47,23 @@ pub struct Cli {
} }
/// The top-level resource namespaces exposed by tvctl. /// The top-level resource namespaces exposed by tvctl.
#[derive(Debug, Subcommand)] #[derive(Debug, Clone, Subcommand)]
pub enum Command { pub enum Command {
/// Manage the background daemon. /// Manage the background daemon.
Daemon, Daemon {
#[command(subcommand)]
command: DaemonCommand,
},
/// Discover and manage devices. /// Discover and manage devices.
Device, Device {
/// List, launch, and stop applications. #[command(subcommand)]
App, command: DeviceCommand,
},
/// List and refresh application metadata.
App {
#[command(subcommand)]
command: AppCommand,
},
/// Send remote control input. /// Send remote control input.
Remote, Remote,
/// Query device state. /// Query device state.
@@ -38,10 +72,515 @@ pub enum Command {
Dev, Dev,
/// Inspect and modify tvctl configuration. /// Inspect and modify tvctl configuration.
Config, Config,
/// Internal daemon entry point used by `tvctl daemon start`.
#[command(hide = true, name = "__daemon_serve")]
InternalDaemonServe,
} }
/// Parse the CLI and return successfully for the repository scaffold. /// Manage the tvctld lifecycle.
pub async fn run() -> anyhow::Result<()> { #[derive(Debug, Clone, Subcommand)]
let _ = Cli::parse(); pub enum DaemonCommand {
/// Start the background daemon process.
Start,
/// Stop the running daemon process.
Stop,
/// Restart the running daemon process.
Restart,
/// Show whether the daemon is running.
Status,
}
/// Discover and inspect known devices.
#[derive(Debug, Clone, Subcommand)]
pub enum DeviceCommand {
/// List devices currently known to the daemon.
List,
/// Trigger a fresh discovery scan.
Discover,
/// Manually add a device by probing its address.
Add(DeviceAddArgs),
/// Show one known device by name or UUID.
Info {
/// The friendly name or UUID to inspect.
target: String,
},
/// Remove one known device by name or UUID.
Remove {
/// The friendly name or UUID to remove.
target: String,
},
/// Mark one known device as the default target.
Select {
/// The friendly name or UUID to make default.
target: String,
},
}
/// Arguments for `tvctl device add`.
#[derive(Debug, Clone, Args)]
pub struct DeviceAddArgs {
/// The normalized platform identifier, currently `roku`.
#[arg(long)]
pub platform: String,
/// The device IP address.
#[arg(long)]
pub address: IpAddr,
/// Optional platform port override.
#[arg(long)]
pub port: Option<u16>,
/// Optional user-assigned friendly name.
#[arg(long)]
pub name: Option<String>,
}
/// App-cache commands.
#[derive(Debug, Clone, Subcommand)]
pub enum AppCommand {
/// List cached apps for the selected or default device platform.
List,
/// Refresh the cached app list from the selected or default device.
Refresh {
/// Clear the platform cache before reloading from the device.
#[arg(long)]
clear: bool,
},
}
/// A user-facing CLI error with a suggested next action.
#[derive(Debug, Error)]
#[error("{message}\nHint: {hint}")]
pub struct CliError {
/// The human-readable error message.
pub message: String,
/// The suggested next action.
pub hint: String,
}
impl CliError {
fn new(message: impl Into<String>, hint: impl Into<String>) -> Self {
Self {
message: message.into(),
hint: hint.into(),
}
}
}
/// Parse the CLI and execute the selected command.
pub async fn run() -> Result<(), CliError> {
let cli = Cli::parse();
let Some(command) = cli.command.clone() else {
let mut cmd = Cli::command();
cmd.print_long_help().map_err(|error| {
CliError::new(
format!("Failed to render CLI help: {error}"),
"Retry the command or inspect the terminal output.",
)
})?;
println!();
return Ok(());
};
match command {
Command::InternalDaemonServe => daemon::serve().await.map_err(|error| {
CliError::new(error.to_string(), "Inspect the daemon logs and retry.")
}),
Command::Daemon { command } => handle_daemon_command(&cli, command).await,
Command::Device { command } => handle_device_command(&cli, command).await,
Command::App { command } => handle_app_command(&cli, command).await,
Command::Remote => Err(CliError::new(
"Remote commands are not wired to the daemon yet.",
"Continue Milestone 4 after the daemon protocol is in place.",
)),
Command::State => Err(CliError::new(
"State queries are not wired to the daemon yet.",
"Continue Milestone 4 after the daemon protocol is in place.",
)),
Command::Dev => Err(CliError::new(
"Developer commands are not wired to the daemon yet.",
"Continue Milestone 4 after the daemon protocol is in place.",
)),
Command::Config => Err(CliError::new(
"Config commands are not wired to the daemon yet.",
"Continue Milestone 4 after the daemon protocol is in place.",
)),
}
}
async fn handle_daemon_command(cli: &Cli, command: DaemonCommand) -> Result<(), CliError> {
match command {
DaemonCommand::Start => daemon_start(cli).await,
DaemonCommand::Stop => daemon_stop(cli).await,
DaemonCommand::Restart => {
if daemon_status_payload().await.is_some() {
daemon_stop(cli).await?;
}
daemon_start(cli).await
}
DaemonCommand::Status => daemon_status(cli).await,
}
}
async fn handle_device_command(cli: &Cli, command: DeviceCommand) -> Result<(), CliError> {
match command {
DeviceCommand::List => {
let response =
send_request(load_socket_path().await?, &DaemonRequest::ListDevices).await?;
let devices: Vec<Device> = parse_response_data(response)?;
render(cli, &devices, || render_device_list(&devices))
}
DeviceCommand::Discover => {
let response =
send_request(load_socket_path().await?, &DaemonRequest::Discover).await?;
let result: DiscoveryResult = parse_response_data(response)?;
render(cli, &result, || render_discovery_result(&result.devices))
}
DeviceCommand::Add(args) => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::AddDevice {
platform: args.platform,
address: args.address,
port: args.port,
name: args.name,
},
)
.await?;
let device: Device = parse_response_data(response)?;
render(cli, &device, || {
format!(
"Added {} [{}] {}:{}{}",
device.name,
device.platform,
device.address,
device.port,
default_marker(&device)
)
})
}
DeviceCommand::Info { target } => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::GetDevice { target },
)
.await?;
let device: Device = parse_response_data(response)?;
render(cli, &device, || render_device_info(&device))
}
DeviceCommand::Remove { target } => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::RemoveDevice { target },
)
.await?;
let device: Device = parse_response_data(response)?;
render(cli, &device, || format!("Removed {}.", device.name))
}
DeviceCommand::Select { target } => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::SelectDevice { target },
)
.await?;
let device: Device = parse_response_data(response)?;
render(cli, &device, || {
format!("Default device set to {}.", device.name)
})
}
}
}
async fn handle_app_command(cli: &Cli, command: AppCommand) -> Result<(), CliError> {
match command {
AppCommand::List => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::ListApps {
device: cli.device.clone(),
platform: None,
},
)
.await?;
let result: AppListResult = parse_response_data(response)?;
render(cli, &result, || {
render_app_list(&result.apps, &result.platform)
})
}
AppCommand::Refresh { clear } => {
let response = send_request(
load_socket_path().await?,
&DaemonRequest::RefreshApps {
device: cli.device.clone(),
clear,
},
)
.await?;
let result: AppRefreshResult = parse_response_data(response)?;
render(cli, &result, || render_app_refresh(&result))
}
}
}
async fn daemon_start(cli: &Cli) -> Result<(), CliError> {
if let Some(status) = daemon_status_payload().await {
return render(cli, &status, || {
format!("tvctld is already running on {}", status.socket)
});
}
let exe = std::env::current_exe().map_err(|error| {
CliError::new(
format!("Unable to locate the tvctl binary: {error}"),
"Run the command from an installed or built tvctl executable.",
)
})?;
TokioCommand::new(exe)
.arg("__daemon_serve")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map_err(|error| {
CliError::new(
format!("Failed to launch tvctld: {error}"),
"Check filesystem permissions and try again.",
)
})?;
for _ in 0..DAEMON_START_WAIT_ATTEMPTS {
if let Some(status) = daemon_status_payload().await {
return render(cli, &status, || {
format!("tvctld started on {}", status.socket)
});
}
sleep(DAEMON_START_WAIT_INTERVAL).await;
}
Err(CliError::new(
"tvctld did not become ready in time.",
"Check whether the socket path is writable and retry `tvctl daemon start`.",
))
}
async fn daemon_stop(cli: &Cli) -> Result<(), CliError> {
let socket = load_socket_path().await?;
let response = send_request(socket, &DaemonRequest::Shutdown).await?;
let data: serde_json::Value = parse_response_data(response)?;
render(cli, &data, || "tvctld stopped.".to_string())
}
async fn daemon_status(cli: &Cli) -> Result<(), CliError> {
if let Some(status) = daemon_status_payload().await {
return render(cli, &status, || {
format!(
"tvctld is running on {} with {} known device(s).",
status.socket, status.device_count
)
});
}
if cli.json {
let status = serde_json::json!({
"running": false,
});
return render(cli, &status, || "tvctld is not running.".to_string());
}
println!("tvctld is not running.");
Ok(()) Ok(())
} }
async fn daemon_status_payload() -> Option<DaemonStatus> {
let socket = load_socket_path().await.ok()?;
let response = send_request(socket, &DaemonRequest::Ping).await.ok()?;
parse_response_data(response).ok()
}
async fn load_socket_path() -> Result<PathBuf, CliError> {
let config = TvctlConfig::load().await.map_err(|error| {
CliError::new(
format!("Failed to load tvctl configuration: {error}"),
"Inspect ~/.config/tvctl/config.toml for invalid TOML.",
)
})?;
let fallback = RuntimePaths::detect().socket_file;
let configured = PathBuf::from(config.daemon.socket);
Ok(if configured.as_os_str().is_empty() {
fallback
} else {
configured
})
}
async fn send_request(
socket_path: PathBuf,
request: &DaemonRequest,
) -> Result<DaemonResponse, CliError> {
let mut stream = UnixStream::connect(&socket_path).await.map_err(|error| {
CliError::new(
format!(
"Unable to reach tvctld at {}: {error}",
socket_path.display()
),
"Run `tvctl daemon start` first.",
)
})?;
let bytes = serde_json::to_vec(request).map_err(|error| {
CliError::new(
format!("Failed to serialize daemon request: {error}"),
"Inspect the CLI build and retry.",
)
})?;
stream.write_all(&bytes).await.map_err(|error| {
CliError::new(
format!("Failed to write request to tvctld: {error}"),
"Check the daemon socket permissions and retry.",
)
})?;
stream.shutdown().await.map_err(|error| {
CliError::new(
format!("Failed to finish the daemon request: {error}"),
"Retry the command after restarting the daemon.",
)
})?;
let mut response_bytes = Vec::new();
stream
.read_to_end(&mut response_bytes)
.await
.map_err(|error| {
CliError::new(
format!("Failed to read the daemon response: {error}"),
"Retry the command after restarting the daemon.",
)
})?;
let response = serde_json::from_slice::<DaemonResponse>(&response_bytes).map_err(|error| {
CliError::new(
format!("Failed to decode the daemon response: {error}"),
"Ensure the CLI and daemon are from the same build.",
)
})?;
if let Some(error) = &response.error {
return Err(CliError::new(
error.message.clone(),
error.hint.clone().unwrap_or_else(|| {
"Retry the command after checking the daemon state.".to_string()
}),
));
}
Ok(response)
}
fn parse_response_data<T>(response: DaemonResponse) -> Result<T, CliError>
where
T: serde::de::DeserializeOwned,
{
let data = response.data.ok_or_else(|| {
CliError::new(
"The daemon response did not include data.",
"Ensure the CLI and daemon are from the same build.",
)
})?;
serde_json::from_value(data).map_err(|error| {
CliError::new(
format!("Failed to decode daemon payload: {error}"),
"Ensure the CLI and daemon are from the same build.",
)
})
}
fn render<T>(cli: &Cli, data: &T, human: impl FnOnce() -> String) -> Result<(), CliError>
where
T: Serialize,
{
if cli.json {
let json = serde_json::to_string_pretty(data).map_err(|error| {
CliError::new(
format!("Failed to encode JSON output: {error}"),
"Retry the command or inspect the output type.",
)
})?;
println!("{json}");
} else {
println!("{}", human());
}
Ok(())
}
fn render_device_list(devices: &[Device]) -> String {
if devices.is_empty() {
return "No devices are registered yet.".to_string();
}
devices
.iter()
.map(|device| {
format!(
"{} [{}] {}:{}{}",
device.name,
device.platform,
device.address,
device.port,
default_marker(device)
)
})
.collect::<Vec<_>>()
.join("\n")
}
fn render_discovery_result(devices: &[Device]) -> String {
if devices.is_empty() {
return "Discovery completed, but no devices were found.".to_string();
}
format!(
"Discovered {} device(s).\n{}",
devices.len(),
render_device_list(devices)
)
}
fn render_device_info(device: &Device) -> String {
[
format!("Name: {}", device.name),
format!("Original Name: {}", device.original_name),
format!("UUID: {}", device.id),
format!("Platform: {}", device.platform),
format!("Address: {}:{}", device.address, device.port),
format!("Default: {}", device.is_default),
format!("Discovered At: {}", device.discovered_at),
format!("Last Seen: {}", device.last_seen),
]
.join("\n")
}
fn render_app_list(apps: &[AppInfo], platform: &str) -> String {
if apps.is_empty() {
return format!("No cached apps are known yet for platform {platform}.");
}
format!(
"Cached apps for {platform}:\n{}",
apps.iter()
.map(|app| format!("{} [{}]", app.name, app.platform_id))
.collect::<Vec<_>>()
.join("\n")
)
}
fn render_app_refresh(result: &AppRefreshResult) -> String {
format!(
"Refreshed {} app(s) from {}. {} cached app(s) are now known for {}.",
result.refreshed_count, result.device.name, result.cached_count, result.platform
)
}
fn default_marker(device: &Device) -> &'static str {
if device.is_default { " (default)" } else { "" }
}
+185
View File
@@ -1,3 +1,7 @@
use std::{collections::BTreeMap, path::PathBuf};
use tokio::fs;
use crate::adapters::AppInfo; use crate::adapters::AppInfo;
/// A platform-level cache of app metadata discovered from live devices. /// A platform-level cache of app metadata discovered from live devices.
@@ -8,3 +12,184 @@ pub struct AppCache {
/// The apps currently known for that platform. /// The apps currently known for that platform.
pub apps: Vec<AppInfo>, pub apps: Vec<AppInfo>,
} }
/// Persisted store for per-platform app cache files.
#[derive(Debug, Clone)]
pub struct AppCacheStore {
root_dir: PathBuf,
}
impl AppCacheStore {
/// Create a cache store rooted at the given directory.
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
}
/// Load the app cache for a single platform or return an empty cache when absent.
pub async fn load_platform(&self, platform: &str) -> anyhow::Result<AppCache> {
let path = self.platform_path(platform);
let apps = match fs::read_to_string(&path).await {
Ok(contents) => serde_json::from_str::<Vec<AppInfo>>(&contents)?,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Vec::new(),
Err(error) => return Err(error.into()),
};
Ok(AppCache {
platform: platform.to_string(),
apps,
})
}
/// Persist the app list for a platform.
pub async fn save_platform(&self, platform: &str, apps: &[AppInfo]) -> anyhow::Result<()> {
fs::create_dir_all(&self.root_dir).await?;
let contents = serde_json::to_string_pretty(apps)?;
fs::write(self.platform_path(platform), contents).await?;
Ok(())
}
/// Merge newly seen apps into the normalized, de-duplicated platform cache.
pub async fn record_platform_apps(
&self,
platform: &str,
apps: Vec<AppInfo>,
) -> anyhow::Result<AppCache> {
let mut deduped = BTreeMap::new();
for app in self.load_platform(platform).await?.apps {
deduped.insert(app.platform_id.clone(), app);
}
for app in apps {
deduped.insert(app.platform_id.clone(), app);
}
let apps: Vec<AppInfo> = deduped.into_values().collect();
self.save_platform(platform, &apps).await?;
Ok(AppCache {
platform: platform.to_string(),
apps,
})
}
/// Resolve an app by case-insensitive name or exact ID from the persisted platform cache.
pub async fn find_app(&self, platform: &str, query: &str) -> anyhow::Result<Option<AppInfo>> {
let cache = self.load_platform(platform).await?;
let normalized = query.to_ascii_lowercase();
Ok(cache.apps.into_iter().find(|app| {
app.platform_id == query
|| app.id == query
|| app.name.to_ascii_lowercase() == normalized
}))
}
/// Remove the persisted app cache for a platform.
pub async fn clear_platform(&self, platform: &str) -> anyhow::Result<()> {
match fs::remove_file(self.platform_path(platform)).await {
Ok(()) => Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(error) => Err(error.into()),
}
}
fn platform_path(&self, platform: &str) -> PathBuf {
self.root_dir.join(format!("{platform}.apps.json"))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn cache_round_trips_and_resolves_names() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let store = AppCacheStore::new(temp_dir.path().join("cache"));
let apps = vec![
AppInfo {
id: "12".to_string(),
name: "Netflix".to_string(),
version: None,
platform_id: "12".to_string(),
},
AppInfo {
id: "837".to_string(),
name: "YouTube".to_string(),
version: None,
platform_id: "837".to_string(),
},
];
store
.record_platform_apps("roku", apps.clone())
.await
.expect("apps should save");
let loaded = store.load_platform("roku").await.expect("apps should load");
assert_eq!(loaded.apps, apps);
let resolved = store
.find_app("roku", "youtube")
.await
.expect("app lookup should work")
.expect("youtube should exist");
assert_eq!(resolved.platform_id, "837");
}
#[tokio::test]
async fn record_platform_apps_merges_existing_entries() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let store = AppCacheStore::new(temp_dir.path().join("cache"));
store
.record_platform_apps(
"roku",
vec![AppInfo {
id: "12".to_string(),
name: "Netflix".to_string(),
version: None,
platform_id: "12".to_string(),
}],
)
.await
.expect("first cache should save");
store
.record_platform_apps(
"roku",
vec![AppInfo {
id: "837".to_string(),
name: "YouTube".to_string(),
version: None,
platform_id: "837".to_string(),
}],
)
.await
.expect("second cache should merge");
let loaded = store.load_platform("roku").await.expect("apps should load");
assert_eq!(loaded.apps.len(), 2);
}
#[tokio::test]
async fn clear_platform_removes_persisted_cache() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let store = AppCacheStore::new(temp_dir.path().join("cache"));
store
.save_platform(
"roku",
&[AppInfo {
id: "12".to_string(),
name: "Netflix".to_string(),
version: None,
platform_id: "12".to_string(),
}],
)
.await
.expect("apps should save");
store
.clear_platform("roku")
.await
.expect("cache should clear");
let loaded = store.load_platform("roku").await.expect("apps should load");
assert!(loaded.apps.is_empty());
}
}
+235
View File
@@ -0,0 +1,235 @@
use std::{
env,
path::{Path, PathBuf},
};
use serde::{Deserialize, Serialize};
use tokio::fs;
/// The complete daemon configuration loaded from TOML.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct TvctlConfig {
/// Runtime daemon settings.
pub daemon: DaemonConfig,
/// Discovery scan behavior.
pub discovery: DiscoveryConfig,
/// Default-device settings.
pub devices: DeviceConfig,
/// Developer tooling toggles.
pub dev: DevConfig,
}
impl Default for TvctlConfig {
fn default() -> Self {
Self {
daemon: DaemonConfig::default(),
discovery: DiscoveryConfig::default(),
devices: DeviceConfig::default(),
dev: DevConfig::default(),
}
}
}
impl TvctlConfig {
/// Load configuration from the default XDG path or return defaults when absent.
pub async fn load() -> anyhow::Result<Self> {
Self::load_from_path(&default_config_path()).await
}
/// Load configuration from a specific path or return defaults when absent.
pub async fn load_from_path(path: &Path) -> anyhow::Result<Self> {
match fs::read_to_string(path).await {
Ok(contents) => Ok(toml::from_str::<Self>(&contents)?),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
Err(error) => Err(error.into()),
}
}
/// Persist configuration to a specific path, creating parent directories first.
pub async fn save_to_path(&self, path: &Path) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let contents = toml::to_string_pretty(self)?;
fs::write(path, contents).await?;
Ok(())
}
}
/// Runtime daemon settings.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct DaemonConfig {
/// Unix socket path for CLI communication.
pub socket: String,
/// Whether the HTTP API is enabled.
pub http_enabled: bool,
/// Loopback HTTP port.
pub http_port: u16,
/// Loopback host or bind address.
pub http_host: String,
/// Logging level.
pub log_level: String,
}
impl Default for DaemonConfig {
fn default() -> Self {
Self {
socket: default_socket_path().to_string_lossy().into_owned(),
http_enabled: true,
http_port: 7272,
http_host: "127.0.0.1".to_string(),
log_level: "info".to_string(),
}
}
}
/// Discovery scan behavior.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct DiscoveryConfig {
/// Whether discovery runs automatically on daemon start.
pub auto_discover: bool,
/// How often discovery repeats, in seconds.
pub interval_secs: u64,
/// Per-device timeout in seconds.
pub timeout_secs: u64,
}
impl Default for DiscoveryConfig {
fn default() -> Self {
Self {
auto_discover: true,
interval_secs: 300,
timeout_secs: 5,
}
}
}
/// Default-device settings.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(default)]
pub struct DeviceConfig {
/// The default device name or UUID.
pub default: String,
}
/// Developer tooling toggles.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct DevConfig {
/// Whether developer tooling is enabled.
pub enabled: bool,
}
impl Default for DevConfig {
fn default() -> Self {
Self { enabled: true }
}
}
/// Canonical runtime paths used by the daemon.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimePaths {
/// The configuration file path.
pub config_file: PathBuf,
/// The data directory root.
pub data_dir: PathBuf,
/// The device registry path.
pub devices_file: PathBuf,
/// The platform app cache directory.
pub cache_dir: PathBuf,
/// The runtime socket path.
pub socket_file: PathBuf,
}
impl RuntimePaths {
/// Build the canonical path set from XDG defaults.
pub fn detect() -> Self {
let config_file = default_config_path();
let data_dir = default_data_dir();
let cache_dir = data_dir.join("cache");
Self {
config_file,
devices_file: data_dir.join("devices.json"),
socket_file: default_socket_path(),
data_dir,
cache_dir,
}
}
}
/// Return the default config file path.
pub fn default_config_path() -> PathBuf {
default_config_dir().join("config.toml")
}
/// Return the default config directory path.
pub fn default_config_dir() -> PathBuf {
if let Ok(path) = env::var("XDG_CONFIG_HOME") {
return PathBuf::from(path).join("tvctl");
}
home_dir().join(".config/tvctl")
}
/// Return the default data directory path.
pub fn default_data_dir() -> PathBuf {
if let Ok(path) = env::var("XDG_DATA_HOME") {
return PathBuf::from(path).join("tvctl");
}
home_dir().join(".local/share/tvctl")
}
/// Return the default runtime socket path.
pub fn default_socket_path() -> PathBuf {
let runtime_dir = env::var("XDG_RUNTIME_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(format!("/run/user/{}", current_uid())));
runtime_dir.join("tvctl.sock")
}
fn home_dir() -> PathBuf {
env::var("HOME")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/tmp"))
}
fn current_uid() -> u32 {
// SAFETY: geteuid reads process state and has no side effects.
unsafe { libc::geteuid() }
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn missing_config_uses_defaults() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let config = TvctlConfig::load_from_path(&temp_dir.path().join("missing.toml"))
.await
.expect("default config should load");
assert_eq!(config, TvctlConfig::default());
}
#[tokio::test]
async fn config_round_trips() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let path = temp_dir.path().join("config.toml");
let config = TvctlConfig {
devices: DeviceConfig {
default: "living-room".to_string(),
},
..TvctlConfig::default()
};
config
.save_to_path(&path)
.await
.expect("config should save");
let loaded = TvctlConfig::load_from_path(&path)
.await
.expect("config should load");
assert_eq!(loaded, config);
}
}
+40 -2
View File
@@ -1,3 +1,41 @@
use anyhow::Context;
use crate::adapters::Device;
use super::registry::{AdapterRegistry, DeviceRegistry};
/// Background discovery orchestration for supported TV platforms. /// Background discovery orchestration for supported TV platforms.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone)]
pub struct DiscoveryService; pub struct DiscoveryService {
adapters: AdapterRegistry,
}
impl DiscoveryService {
/// Create a discovery service over the registered adapters.
pub fn new(adapters: AdapterRegistry) -> Self {
Self { adapters }
}
/// Discover all supported platforms and merge them into the registry.
pub async fn discover_all(&self, registry: &mut DeviceRegistry) -> anyhow::Result<Vec<Device>> {
let mut discovered = Vec::new();
for platform in self.adapters.supported_platforms() {
let mut devices = self
.discover_platform(platform, registry)
.await
.with_context(|| format!("failed discovery for platform '{platform}'"))?;
discovered.append(&mut devices);
}
Ok(discovered)
}
/// Discover one platform and merge the results into the registry.
pub async fn discover_platform(
&self,
platform: &str,
registry: &mut DeviceRegistry,
) -> anyhow::Result<Vec<Device>> {
let discovered = self.adapters.discover(platform).await?;
Ok(registry.merge_discovered(discovered))
}
}
+150
View File
@@ -0,0 +1,150 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::net::IpAddr;
use crate::adapters::{AppInfo, Device};
/// A request sent from the CLI to the daemon over the Unix socket.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "command", rename_all = "snake_case")]
pub enum DaemonRequest {
/// Check whether the daemon is alive.
Ping,
/// Ask the daemon to shut down cleanly.
Shutdown,
/// Trigger discovery across supported platforms.
Discover,
/// List all known devices in the registry.
ListDevices,
/// Return one known device by UUID or friendly name.
GetDevice {
/// The UUID or friendly name to resolve.
target: String,
},
/// Manually add a device by probing the provided address.
AddDevice {
/// The normalized platform identifier.
platform: String,
/// The device IP address.
address: IpAddr,
/// Optional platform port override.
port: Option<u16>,
/// Optional user-assigned friendly name.
name: Option<String>,
},
/// Remove one known device by UUID or friendly name.
RemoveDevice {
/// The UUID or friendly name to remove.
target: String,
},
/// Mark one known device as the default target.
SelectDevice {
/// The UUID or friendly name to select.
target: String,
},
/// Return cached apps for a platform or target device.
ListApps {
/// Optional UUID or friendly name.
device: Option<String>,
/// Optional platform override when no device is provided.
platform: Option<String>,
},
/// Refresh cached apps for one device.
RefreshApps {
/// Optional UUID or friendly name.
device: Option<String>,
/// Whether to clear the platform cache before reloading from the device.
clear: bool,
},
}
/// A standard daemon response envelope for IPC.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DaemonResponse {
/// Whether the request succeeded.
pub ok: bool,
/// Success payload data.
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
/// Structured error payload.
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<DaemonError>,
}
impl DaemonResponse {
/// Construct a success response with JSON payload.
pub fn success<T: Serialize>(data: T) -> Self {
Self {
ok: true,
data: Some(serde_json::to_value(data).unwrap_or(Value::Null)),
error: None,
}
}
/// Construct an error response.
pub fn error(code: &str, message: impl Into<String>, hint: impl Into<Option<String>>) -> Self {
Self {
ok: false,
data: None,
error: Some(DaemonError {
code: code.to_string(),
message: message.into(),
hint: hint.into(),
}),
}
}
}
/// A stable IPC error payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DaemonError {
/// Stable machine-readable error code.
pub code: String,
/// Human-readable error message.
pub message: String,
/// Suggested next action.
#[serde(skip_serializing_if = "Option::is_none")]
pub hint: Option<String>,
}
/// A daemon health/status payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DaemonStatus {
/// The daemon process ID.
pub pid: u32,
/// The socket path being served.
pub socket: String,
/// Whether the HTTP API is enabled.
pub http_enabled: bool,
/// The number of known devices.
pub device_count: usize,
}
/// Discovery results returned by the daemon.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DiscoveryResult {
/// The devices discovered during this scan.
pub devices: Vec<Device>,
}
/// Cached app-list payload returned by the daemon.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AppListResult {
/// The platform the cache belongs to.
pub platform: String,
/// The currently cached apps for that platform.
pub apps: Vec<AppInfo>,
}
/// App refresh results returned by the daemon.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AppRefreshResult {
/// The device used for the refresh.
pub device: Device,
/// The platform cache that was updated.
pub platform: String,
/// The number of apps returned by the live device.
pub refreshed_count: usize,
/// The total number of cached apps after merge/replace.
pub cached_count: usize,
}
+516 -2
View File
@@ -1,8 +1,522 @@
pub mod cache; pub mod cache;
pub mod config;
pub mod discovery; pub mod discovery;
pub mod ipc;
pub mod registry; pub mod registry;
pub mod state; pub mod state;
use std::{
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use cache::AppCacheStore;
use config::{RuntimePaths, TvctlConfig};
use discovery::DiscoveryService;
use ipc::{
AppListResult, AppRefreshResult, DaemonRequest, DaemonResponse, DaemonStatus, DiscoveryResult,
};
use registry::{AdapterRegistry, DeviceRegistry};
use state::StateCache;
use tokio::{
fs,
io::{AsyncReadExt, AsyncWriteExt},
net::{UnixListener, UnixStream},
sync::Mutex,
time::{self, MissedTickBehavior},
};
use tracing::warn;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use crate::adapters::Device;
/// The long-lived tvctld process. /// The long-lived tvctld process.
#[derive(Debug, Default)] #[derive(Debug)]
pub struct Daemon; pub struct Daemon {
/// The loaded daemon configuration.
pub config: TvctlConfig,
/// The canonical runtime path set.
pub paths: RuntimePaths,
/// Persisted known devices.
pub registry: DeviceRegistry,
/// Persisted platform app metadata.
pub app_cache: AppCacheStore,
/// In-memory state snapshots.
pub state_cache: StateCache,
/// Available platform adapters.
pub adapters: AdapterRegistry,
/// Discovery orchestration over registered adapters.
pub discovery: DiscoveryService,
}
impl Daemon {
/// Load the daemon's persisted state and adapter registry.
pub async fn load() -> anyhow::Result<Self> {
let config = TvctlConfig::load().await?;
let mut paths = RuntimePaths::detect();
if !config.daemon.socket.is_empty() {
paths.socket_file = PathBuf::from(&config.daemon.socket);
}
let adapters = AdapterRegistry::default();
let mut registry = DeviceRegistry::load(paths.devices_file.clone()).await?;
if !config.devices.default.is_empty() {
let _ = registry.set_default(&config.devices.default);
} else {
registry.ensure_default();
}
let app_cache = AppCacheStore::new(paths.cache_dir.clone());
let discovery = DiscoveryService::new(adapters.clone());
Ok(Self {
config,
paths,
registry,
app_cache,
state_cache: StateCache::default(),
adapters,
discovery,
})
}
}
/// Run the long-lived daemon loop over a Unix socket.
pub async fn serve() -> anyhow::Result<()> {
let daemon = Arc::new(Mutex::new(Daemon::load().await?));
{
let mut guard = daemon.lock().await;
if guard.config.discovery.auto_discover {
run_discovery(&mut guard).await?;
}
}
let (socket_path, interval_secs) = {
let guard = daemon.lock().await;
(
guard.paths.socket_file.clone(),
guard.config.discovery.interval_secs,
)
};
if let Some(parent) = socket_path.parent() {
fs::create_dir_all(parent).await?;
}
let _ = fs::remove_file(&socket_path).await;
let listener = UnixListener::bind(&socket_path)?;
set_socket_permissions(&socket_path).await?;
let mut discovery_interval = discovery_interval(interval_secs);
if let Some(interval) = discovery_interval.as_mut() {
interval.tick().await;
}
loop {
if let Some(interval) = discovery_interval.as_mut() {
tokio::select! {
_ = interval.tick() => {
let mut guard = daemon.lock().await;
if let Err(error) = run_discovery(&mut guard).await {
warn!("Periodic discovery failed: {error}");
}
}
accepted = listener.accept() => {
let (stream, _) = accepted?;
let should_stop = handle_connection(stream, daemon.clone()).await?;
if should_stop {
break;
}
}
}
} else {
let (stream, _) = listener.accept().await?;
let should_stop = handle_connection(stream, daemon.clone()).await?;
if should_stop {
break;
}
}
}
let _ = fs::remove_file(&socket_path).await;
Ok(())
}
async fn set_socket_permissions(path: &Path) -> anyhow::Result<()> {
#[cfg(unix)]
{
fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).await?;
}
Ok(())
}
async fn handle_connection(
mut stream: UnixStream,
daemon: Arc<Mutex<Daemon>>,
) -> anyhow::Result<bool> {
let mut request_bytes = Vec::new();
stream.read_to_end(&mut request_bytes).await?;
let request = match serde_json::from_slice::<DaemonRequest>(&request_bytes) {
Ok(request) => request,
Err(error) => {
let response = DaemonResponse::error(
"invalid_request",
format!("Invalid daemon request: {error}"),
Some("Upgrade the CLI or inspect the daemon socket protocol.".to_string()),
);
write_response(&mut stream, &response).await?;
return Ok(false);
}
};
let (response, should_stop) = handle_request(request, daemon).await;
write_response(&mut stream, &response).await?;
Ok(should_stop)
}
async fn write_response(stream: &mut UnixStream, response: &DaemonResponse) -> anyhow::Result<()> {
let bytes = serde_json::to_vec(response)?;
stream.write_all(&bytes).await?;
stream.shutdown().await?;
Ok(())
}
async fn handle_request(
request: DaemonRequest,
daemon: Arc<Mutex<Daemon>>,
) -> (DaemonResponse, bool) {
match request {
DaemonRequest::Ping => {
let guard = daemon.lock().await;
(
DaemonResponse::success(DaemonStatus {
pid: std::process::id(),
socket: guard.paths.socket_file.display().to_string(),
http_enabled: guard.config.daemon.http_enabled,
device_count: guard.registry.devices.len(),
}),
false,
)
}
DaemonRequest::Shutdown => (
DaemonResponse::success(serde_json::json!({
"message": "Daemon shutdown requested."
})),
true,
),
DaemonRequest::Discover => {
let mut guard = daemon.lock().await;
match run_discovery(&mut guard).await {
Ok(devices) => (DaemonResponse::success(DiscoveryResult { devices }), false),
Err(error) => (
DaemonResponse::error(
"discovery_failed",
format!("Device discovery failed: {error}"),
Some(
"Verify SSDP works on this network or add the device manually."
.to_string(),
),
),
false,
),
}
}
DaemonRequest::ListDevices => {
let guard = daemon.lock().await;
(
DaemonResponse::success(guard.registry.devices.clone()),
false,
)
}
DaemonRequest::GetDevice { target } => {
let guard = daemon.lock().await;
match guard.registry.find(&target).cloned() {
Some(device) => (DaemonResponse::success(device), false),
None => (
DaemonResponse::error(
"device_not_found",
format!("Device '{target}' is not in the registry."),
Some(
"Run `tvctl device list` or `tvctl device discover` first.".to_string(),
),
),
false,
),
}
}
DaemonRequest::AddDevice {
platform,
address,
port,
name,
} => {
let mut guard = daemon.lock().await;
match guard.adapters.probe_manual(&platform, address, port).await {
Ok(info) => {
let device = guard.registry.merge_manual(info, name);
if let Err(error) = sync_registry_config(&mut guard).await {
return (
DaemonResponse::error(
"registry_save_failed",
error,
Some(
"Check permissions for the tvctl config and data directories."
.to_string(),
),
),
false,
);
}
(DaemonResponse::success(device), false)
}
Err(error) => (
DaemonResponse::error(
"manual_add_failed",
format!(
"Could not add {platform} device at {address}: {}",
error.root_cause()
),
Some(
"Verify the platform, IP, and port, then make sure the TV is reachable."
.to_string(),
),
),
false,
),
}
}
DaemonRequest::RemoveDevice { target } => {
let mut guard = daemon.lock().await;
match guard.registry.remove(&target) {
Some(device) => {
if let Err(error) = sync_registry_config(&mut guard).await {
return (
DaemonResponse::error(
"registry_save_failed",
error,
Some(
"Check permissions for the tvctl config and data directories."
.to_string(),
),
),
false,
);
}
(DaemonResponse::success(device), false)
}
None => (
DaemonResponse::error(
"device_not_found",
format!("Device '{target}' is not in the registry."),
Some(
"Run `tvctl device list` to confirm the device name or UUID."
.to_string(),
),
),
false,
),
}
}
DaemonRequest::SelectDevice { target } => {
let mut guard = daemon.lock().await;
match guard.registry.set_default(&target) {
Some(device) => {
guard.config.devices.default = device.id.to_string();
if let Err(error) = sync_registry_config(&mut guard).await {
return (
DaemonResponse::error(
"config_save_failed",
error,
Some(
"Check permissions for the tvctl config and data directories."
.to_string(),
),
),
false,
);
}
(DaemonResponse::success(device), false)
}
None => (
DaemonResponse::error(
"device_not_found",
format!("Device '{target}' is not in the registry."),
Some(
"Run `tvctl device list` to confirm the device name or UUID."
.to_string(),
),
),
false,
),
}
}
DaemonRequest::ListApps { device, platform } => {
let guard = daemon.lock().await;
let platform = match (device.as_deref(), platform) {
(Some(target), platform) => {
let device = match resolve_target_device(&guard.registry, Some(target)) {
Ok(device) => device,
Err(response) => return (response, false),
};
let platform = platform.unwrap_or_else(|| device.platform.clone());
if platform != device.platform {
return (
DaemonResponse::error(
"platform_mismatch",
format!(
"Requested platform '{platform}' does not match device platform '{}'.",
device.platform
),
Some("Omit `--platform` or choose a device on the requested platform.".to_string()),
),
false,
);
}
platform
}
(None, Some(platform)) => platform,
(None, None) => match resolve_target_device(&guard.registry, None) {
Ok(device) => device.platform,
Err(response) => return (response, false),
},
};
match guard.app_cache.load_platform(&platform).await {
Ok(cache) => (
DaemonResponse::success(AppListResult {
platform,
apps: cache.apps,
}),
false,
),
Err(error) => (
DaemonResponse::error(
"app_cache_load_failed",
format!("Failed to load the cached app list: {error}"),
Some("Refresh the app cache or check filesystem permissions.".to_string()),
),
false,
),
}
}
DaemonRequest::RefreshApps { device, clear } => {
let guard = daemon.lock().await;
let device = match resolve_target_device(&guard.registry, device.as_deref()) {
Ok(device) => device,
Err(response) => return (response, false),
};
if clear {
if let Err(error) = guard.app_cache.clear_platform(&device.platform).await {
return (
DaemonResponse::error(
"app_cache_clear_failed",
format!("Failed to clear the cached app list: {error}"),
Some("Check permissions for ~/.local/share/tvctl/cache.".to_string()),
),
false,
);
}
}
match guard.adapters.list_apps(&device).await {
Ok(apps) => match guard
.app_cache
.record_platform_apps(&device.platform, apps.clone())
.await
{
Ok(cache) => (
DaemonResponse::success(AppRefreshResult {
device,
platform: cache.platform,
refreshed_count: apps.len(),
cached_count: cache.apps.len(),
}),
false,
),
Err(error) => (
DaemonResponse::error(
"app_cache_save_failed",
format!("App refresh succeeded but cache persistence failed: {error}"),
Some("Check permissions for ~/.local/share/tvctl/cache.".to_string()),
),
false,
),
},
Err(error) => (
DaemonResponse::error(
"app_refresh_failed",
format!("Failed to fetch apps from {}: {error}", device.name),
Some("Verify the TV is online and supports app listing.".to_string()),
),
false,
),
}
}
}
}
async fn run_discovery(daemon: &mut Daemon) -> anyhow::Result<Vec<Device>> {
let discovery = daemon.discovery.clone();
let devices = discovery.discover_all(&mut daemon.registry).await?;
if !daemon.config.devices.default.is_empty() {
let _ = daemon.registry.set_default(&daemon.config.devices.default);
} else {
daemon.registry.ensure_default();
}
sync_registry_config(daemon)
.await
.map_err(anyhow::Error::msg)?;
Ok(devices)
}
async fn sync_registry_config(daemon: &mut Daemon) -> Result<(), String> {
daemon.registry.ensure_default();
daemon.config.devices.default = daemon
.registry
.default_device()
.map(|device| device.id.to_string())
.unwrap_or_default();
daemon
.registry
.save()
.await
.map_err(|error| format!("Failed to save the device registry: {error}"))?;
daemon
.config
.save_to_path(&daemon.paths.config_file)
.await
.map_err(|error| format!("Failed to save the tvctl config: {error}"))?;
Ok(())
}
fn resolve_target_device(
registry: &DeviceRegistry,
target: Option<&str>,
) -> Result<Device, DaemonResponse> {
match target {
Some(target) => registry.find(target).cloned().ok_or_else(|| {
DaemonResponse::error(
"device_not_found",
format!("Device '{target}' is not in the registry."),
Some("Run `tvctl device list` to confirm the device name or UUID.".to_string()),
)
}),
None => registry.default_device().cloned().ok_or_else(|| {
DaemonResponse::error(
"no_default_device",
"No default device is configured yet.".to_string(),
Some("Run `tvctl device discover` or `tvctl device add`, then `tvctl device select`.".to_string()),
)
}),
}
}
fn discovery_interval(interval_secs: u64) -> Option<time::Interval> {
if interval_secs == 0 {
return None;
}
let mut interval = time::interval(Duration::from_secs(interval_secs));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
Some(interval)
}
+390 -2
View File
@@ -1,8 +1,396 @@
use crate::adapters::Device; use std::{net::IpAddr, path::PathBuf};
use anyhow::Context;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use tokio::fs;
use uuid::Uuid;
use crate::adapters::{Device, DeviceInfo, TvAdapter, roku::RokuAdapter};
/// The persisted collection of known devices. /// The persisted collection of known devices.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceRegistry { pub struct DeviceRegistry {
path: PathBuf,
/// All devices currently remembered by the daemon. /// All devices currently remembered by the daemon.
pub devices: Vec<Device>, pub devices: Vec<Device>,
} }
impl DeviceRegistry {
/// Load the registry from disk or return an empty registry when absent.
pub async fn load(path: PathBuf) -> anyhow::Result<Self> {
let devices = match fs::read_to_string(&path).await {
Ok(contents) => serde_json::from_str::<Vec<Device>>(&contents).with_context(|| {
format!("failed to parse device registry at {}", path.display())
})?,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Vec::new(),
Err(error) => return Err(error.into()),
};
Ok(Self { path, devices })
}
/// Persist the current registry to disk.
pub async fn save(&self) -> anyhow::Result<()> {
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent).await?;
}
let contents = serde_json::to_string_pretty(&self.devices)?;
fs::write(&self.path, contents).await?;
Ok(())
}
/// Return all known devices.
pub fn list(&self) -> &[Device] {
&self.devices
}
/// Upsert discovered devices, preserving UUIDs for known entries.
pub fn merge_discovered(&mut self, discovered: Vec<DeviceInfo>) -> Vec<Device> {
discovered
.into_iter()
.map(|info| self.upsert_device(info, None))
.collect()
}
/// Add or update a manually specified device after it has been probed.
pub fn merge_manual(&mut self, info: DeviceInfo, name: Option<String>) -> Device {
self.upsert_device(info, name)
}
/// Set the default device by UUID or name.
pub fn set_default(&mut self, target: &str) -> Option<Device> {
let selected = self.find(target)?.id;
let mut selected_device = None;
for device in &mut self.devices {
let is_match = device.id == selected;
device.is_default = is_match;
if is_match {
selected_device = Some(device.clone());
}
}
selected_device
}
/// Find a device by UUID or case-insensitive name.
pub fn find(&self, target: &str) -> Option<&Device> {
let target_uuid = Uuid::parse_str(target).ok();
let normalized = target.to_ascii_lowercase();
self.devices.iter().find(|device| {
target_uuid.map(|uuid| device.id == uuid).unwrap_or(false)
|| device.name.to_ascii_lowercase() == normalized
})
}
/// Return the current default device, if any.
pub fn default_device(&self) -> Option<&Device> {
self.devices.iter().find(|device| device.is_default)
}
/// Remove a device by UUID or case-insensitive name.
pub fn remove(&mut self, target: &str) -> Option<Device> {
let index = self
.devices
.iter()
.position(|device| matches_target(device, target))?;
let removed = self.devices.remove(index);
self.ensure_default();
Some(removed)
}
/// Ensure the registry's default marker is valid and singular.
pub fn ensure_default(&mut self) {
if self.devices.is_empty() {
return;
}
let Some(default_index) = self.devices.iter().position(|device| device.is_default) else {
self.devices[0].is_default = true;
return;
};
for (index, device) in self.devices.iter_mut().enumerate() {
device.is_default = index == default_index;
}
}
fn upsert_device(&mut self, info: DeviceInfo, name: Option<String>) -> Device {
let DeviceInfo {
name: original_name,
platform,
address,
port,
} = info;
let now = Utc::now();
if let Some(device) = self.find_platform_address_mut(&platform, address) {
device.port = port;
device.original_name = original_name.clone();
if let Some(name) = name {
device.name = name;
}
device.last_seen = now;
return device.clone();
}
let is_default = self.devices.is_empty();
let device = Device {
id: Uuid::new_v4(),
name: name.unwrap_or_else(|| original_name.clone()),
original_name,
platform,
address,
port,
is_default,
discovered_at: now,
last_seen: now,
};
self.devices.push(device.clone());
self.ensure_default();
device
}
fn find_platform_address_mut(
&mut self,
platform: &str,
address: IpAddr,
) -> Option<&mut Device> {
self.devices
.iter_mut()
.find(|device| device.platform == platform && device.address == address)
}
}
impl Default for DeviceRegistry {
fn default() -> Self {
Self {
path: PathBuf::from("devices.json"),
devices: Vec::new(),
}
}
}
/// A registry of platform adapters available to the daemon.
#[derive(Debug, Clone)]
pub struct AdapterRegistry {
roku: RokuAdapter,
}
impl Default for AdapterRegistry {
fn default() -> Self {
Self {
roku: RokuAdapter::new(),
}
}
}
impl AdapterRegistry {
/// Return the supported platform names.
pub fn supported_platforms(&self) -> Vec<&'static str> {
vec!["roku"]
}
/// Discover candidate devices for one platform.
pub async fn discover(&self, platform: &str) -> anyhow::Result<Vec<DeviceInfo>> {
match platform {
"roku" => Ok(self.roku.discover().await?),
other => anyhow::bail!("unsupported platform '{other}'"),
}
}
/// Return true when a platform is supported.
pub fn supports(&self, platform: &str) -> bool {
self.supported_platforms().contains(&platform)
}
/// Probe a manually specified device to verify it matches the requested platform.
pub async fn probe_manual(
&self,
platform: &str,
address: IpAddr,
port: Option<u16>,
) -> anyhow::Result<DeviceInfo> {
match platform {
"roku" => Ok(self
.roku
.probe_device(address, port.unwrap_or(8060))
.await?),
other => anyhow::bail!("unsupported platform '{other}'"),
}
}
/// Return apps from a concrete device using its platform adapter.
pub async fn list_apps(
&self,
device: &Device,
) -> anyhow::Result<Vec<crate::adapters::AppInfo>> {
match device.platform.as_str() {
"roku" => Ok(self.roku.list_apps(device).await?),
other => anyhow::bail!("unsupported platform '{other}'"),
}
}
}
fn matches_target(device: &Device, target: &str) -> bool {
let target_uuid = Uuid::parse_str(target).ok();
let normalized = target.to_ascii_lowercase();
target_uuid.map(|uuid| device.id == uuid).unwrap_or(false)
|| device.name.to_ascii_lowercase() == normalized
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn registry_round_trips_and_preserves_ids() {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let path = temp_dir.path().join("devices.json");
let mut registry = DeviceRegistry::load(path.clone())
.await
.expect("registry should load");
let first = registry.merge_discovered(vec![DeviceInfo {
name: "Living Room".to_string(),
platform: "roku".to_string(),
address: "10.0.0.5".parse().expect("valid ip"),
port: 8060,
}]);
let first_id = first[0].id;
registry.save().await.expect("registry should save");
let mut loaded = DeviceRegistry::load(path)
.await
.expect("registry should reload");
let second = loaded.merge_discovered(vec![DeviceInfo {
name: "Living Room Roku".to_string(),
platform: "roku".to_string(),
address: "10.0.0.5".parse().expect("valid ip"),
port: 8060,
}]);
assert_eq!(second[0].id, first_id);
assert_eq!(second[0].original_name, "Living Room Roku");
}
#[test]
fn set_default_matches_case_insensitive_names() {
let now = Utc::now();
let mut registry = DeviceRegistry {
path: PathBuf::from("devices.json"),
devices: vec![
Device {
id: Uuid::new_v4(),
name: "Living Room".to_string(),
original_name: "Living Room".to_string(),
platform: "roku".to_string(),
address: "10.0.0.5".parse().expect("valid ip"),
port: 8060,
is_default: false,
discovered_at: now,
last_seen: now,
},
Device {
id: Uuid::new_v4(),
name: "Bedroom".to_string(),
original_name: "Bedroom".to_string(),
platform: "roku".to_string(),
address: "10.0.0.6".parse().expect("valid ip"),
port: 8060,
is_default: false,
discovered_at: now,
last_seen: now,
},
],
};
let selected = registry
.set_default("living room")
.expect("device should exist");
assert_eq!(selected.name, "Living Room");
assert_eq!(
registry.default_device().map(|device| device.name.as_str()),
Some("Living Room")
);
}
#[test]
fn merge_manual_updates_existing_device_without_replacing_id() {
let now = Utc::now();
let id = Uuid::new_v4();
let mut registry = DeviceRegistry {
path: PathBuf::from("devices.json"),
devices: vec![Device {
id,
name: "Office".to_string(),
original_name: "Office Roku".to_string(),
platform: "roku".to_string(),
address: "10.0.0.9".parse().expect("valid ip"),
port: 8060,
is_default: true,
discovered_at: now,
last_seen: now,
}],
};
let merged = registry.merge_manual(
DeviceInfo {
name: "Upstairs Roku".to_string(),
platform: "roku".to_string(),
address: "10.0.0.9".parse().expect("valid ip"),
port: 8061,
},
Some("Bedroom".to_string()),
);
assert_eq!(merged.id, id);
assert_eq!(merged.name, "Bedroom");
assert_eq!(merged.original_name, "Upstairs Roku");
assert_eq!(merged.port, 8061);
}
#[test]
fn remove_promotes_another_default_when_needed() {
let now = Utc::now();
let living_room_id = Uuid::new_v4();
let mut registry = DeviceRegistry {
path: PathBuf::from("devices.json"),
devices: vec![
Device {
id: living_room_id,
name: "Living Room".to_string(),
original_name: "Living Room".to_string(),
platform: "roku".to_string(),
address: "10.0.0.5".parse().expect("valid ip"),
port: 8060,
is_default: true,
discovered_at: now,
last_seen: now,
},
Device {
id: Uuid::new_v4(),
name: "Bedroom".to_string(),
original_name: "Bedroom".to_string(),
platform: "roku".to_string(),
address: "10.0.0.6".parse().expect("valid ip"),
port: 8060,
is_default: false,
discovered_at: now,
last_seen: now,
},
],
};
let removed = registry
.remove(&living_room_id.to_string())
.expect("device should be removed");
assert_eq!(removed.name, "Living Room");
assert_eq!(
registry.default_device().map(|device| device.name.as_str()),
Some("Bedroom")
);
}
}
+12
View File
@@ -10,3 +10,15 @@ pub struct StateCache {
/// State entries keyed by device UUID. /// State entries keyed by device UUID.
pub entries: HashMap<Uuid, DeviceState>, pub entries: HashMap<Uuid, DeviceState>,
} }
impl StateCache {
/// Insert or replace the last known state for a device.
pub fn insert(&mut self, state: DeviceState) {
self.entries.insert(state.device_id, state);
}
/// Read the last known state for a device.
pub fn get(&self, device_id: Uuid) -> Option<&DeviceState> {
self.entries.get(&device_id)
}
}
+5 -2
View File
@@ -1,6 +1,6 @@
/// Launch the tvctl binary entry point. /// Launch the tvctl binary entry point.
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter( .with_env_filter(
tracing_subscriber::EnvFilter::from_default_env() tracing_subscriber::EnvFilter::from_default_env()
@@ -10,5 +10,8 @@ async fn main() -> anyhow::Result<()> {
.compact() .compact()
.init(); .init();
tvctl::cli::run().await if let Err(error) = tvctl::cli::run().await {
eprintln!("{error}");
std::process::exit(1);
}
} }