feat: finalize HTTP direct dispatch refactor and pending milestone updates

Switch API execution to direct daemon request handling, add regression
coverage for non-socket HTTP dispatch, and include the remaining pending
local updates across CLI/daemon/docs from the current worktree.
This commit is contained in:
44r0n7
2026-04-18 11:42:10 -04:00
parent 26cc0c973a
commit 274844b558
14 changed files with 1131 additions and 108 deletions
+71 -79
View File
@@ -9,16 +9,12 @@ use axum::{
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
fs,
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
};
use tokio::fs;
use crate::{
adapters::{Device, TvKey, parse_normalized_tv_key},
daemon::{
SharedDaemon,
self, SharedDaemon,
config::TvctlConfig,
ipc::{
AppListResult, ConfigReloadResult, DaemonRequest, DaemonResponse, DaemonStatus,
@@ -355,17 +351,13 @@ async fn execute_json<T>(daemon: SharedDaemon, request: DaemonRequest) -> Respon
where
T: Serialize + DeserializeOwned,
{
match send_daemon_request(daemon, &request).await {
Ok(response) => from_daemon_response::<T>(response),
Err(response) => response,
}
let response = execute_daemon_request(daemon, request).await;
from_daemon_response::<T>(response)
}
async fn execute_json_value(daemon: SharedDaemon, request: DaemonRequest) -> Response {
match send_daemon_request(daemon, &request).await {
Ok(response) => from_daemon_response::<Value>(response),
Err(response) => response,
}
let response = execute_daemon_request(daemon, request).await;
from_daemon_response::<Value>(response)
}
fn from_daemon_response<T>(response: DaemonResponse) -> Response
@@ -459,74 +451,74 @@ fn status_for_error(code: &str) -> StatusCode {
}
}
async fn send_daemon_request(
daemon: SharedDaemon,
request: &DaemonRequest,
) -> Result<DaemonResponse, Response> {
let socket_path = {
let guard = daemon.lock().await;
guard.paths.socket_file.clone()
async fn execute_daemon_request(daemon: SharedDaemon, request: DaemonRequest) -> DaemonResponse {
let (response, should_stop) = daemon::execute_request(request, daemon).await;
if should_stop {
return DaemonResponse::error(
"invalid_http_request",
"HTTP API routes cannot trigger daemon shutdown.",
Some("Use the Unix socket daemon protocol or CLI for lifecycle control.".to_string()),
);
}
response
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tokio::sync::Mutex;
use super::execute_daemon_request;
use crate::daemon::{
Daemon,
cache::AppCacheStore,
config::{RuntimePaths, TvctlConfig},
discovery::DiscoveryService,
ipc::DaemonRequest,
registry::{AdapterRegistry, DeviceRegistry},
state::StateCache,
};
let mut stream = UnixStream::connect(&socket_path).await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_socket_unreachable",
format!(
"Unable to reach tvctld at {}: {error}",
socket_path.display()
),
Some(
"Check whether the daemon socket is writable and the daemon is running."
.to_string(),
),
)
})?;
async fn build_test_daemon() -> Arc<Mutex<Daemon>> {
let temp_dir = tempfile::tempdir().expect("temp dir should exist");
let root = temp_dir.path();
let data_dir = root.join("data");
std::fs::create_dir_all(&data_dir).expect("data dir should exist");
let bytes = serde_json::to_vec(request).map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"request_encode_failed",
format!("Failed to encode daemon request: {error}"),
Some("Ensure the HTTP API and daemon are from the same build.".to_string()),
)
})?;
stream.write_all(&bytes).await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_write_failed",
format!("Failed to write request to tvctld: {error}"),
Some("Retry the request after checking the daemon state.".to_string()),
)
})?;
stream.shutdown().await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_shutdown_failed",
format!("Failed to finish the daemon request: {error}"),
Some("Retry the request after restarting the daemon.".to_string()),
)
})?;
let paths = RuntimePaths {
config_file: root.join("config.toml"),
data_dir: data_dir.clone(),
devices_file: data_dir.join("devices.json"),
cache_dir: data_dir.join("cache"),
active_socket_file: data_dir.join("active_socket"),
socket_file: root.join("tvctl.sock"),
};
let config = TvctlConfig::default();
let adapters = AdapterRegistry::from_config(&config);
let registry = DeviceRegistry::load(paths.devices_file.clone())
.await
.expect("registry should load");
let mut response_bytes = Vec::new();
stream
.read_to_end(&mut response_bytes)
.await
.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_read_failed",
format!("Failed to read the daemon response: {error}"),
Some("Retry the request after restarting the daemon.".to_string()),
)
})?;
Arc::new(Mutex::new(Daemon {
config,
paths: paths.clone(),
registry,
app_cache: AppCacheStore::new(paths.cache_dir.clone()),
state_cache: StateCache::default(),
adapters: adapters.clone(),
discovery: DiscoveryService::new(adapters),
}))
}
serde_json::from_slice::<DaemonResponse>(&response_bytes).map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_response_invalid",
format!("Failed to decode the daemon response: {error}"),
Some("Ensure the HTTP API and daemon are from the same build.".to_string()),
)
})
#[tokio::test]
async fn http_dispatch_rejects_shutdown_requests() {
let daemon = build_test_daemon().await;
let response = execute_daemon_request(daemon, DaemonRequest::Shutdown).await;
let error = response
.error
.expect("shutdown should be rejected via HTTP");
assert_eq!(error.code, "invalid_http_request");
}
}
+7 -3
View File
@@ -311,8 +311,8 @@ struct ServiceUninstallResult {
pub async fn run() -> Result<(), CliError> {
let cli = Cli::parse();
let Some(command) = cli.command.clone() else {
let mut cmd = Cli::command();
cmd.print_long_help().map_err(|error| {
let mut command = build_cli_command();
command.print_long_help().map_err(|error| {
CliError::new(
format!("Failed to render CLI help: {error}"),
"Retry the command or inspect the terminal output.",
@@ -338,7 +338,7 @@ pub async fn run() -> Result<(), CliError> {
}
fn handle_completion_command(shell: CompletionShell) -> Result<(), CliError> {
let mut command = Cli::command();
let mut command = build_cli_command();
match shell {
CompletionShell::Bash => print_completions(Shell::Bash, &mut command),
CompletionShell::Zsh => print_completions(Shell::Zsh, &mut command),
@@ -356,6 +356,10 @@ fn print_completions<G: Generator>(generator: G, command: &mut clap::Command) {
);
}
fn build_cli_command() -> clap::Command {
Cli::command()
}
async fn handle_daemon_command(cli: &Cli, command: DaemonCommand) -> Result<(), CliError> {
match command {
DaemonCommand::Start => daemon_start(cli).await,
+26 -1
View File
@@ -7,6 +7,7 @@ use std::{
use anyhow::bail;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tracing_subscriber::EnvFilter;
/// The complete daemon configuration loaded from TOML.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
@@ -93,7 +94,10 @@ impl TvctlConfig {
"daemon.http_enabled" => self.daemon.http_enabled = parse_bool(key, value)?,
"daemon.http_port" => self.daemon.http_port = parse_value(key, value)?,
"daemon.http_host" => self.daemon.http_host = value.to_string(),
"daemon.log_level" => self.daemon.log_level = value.to_string(),
"daemon.log_level" => {
validate_log_level(value)?;
self.daemon.log_level = value.to_string();
}
"discovery.auto_discover" => self.discovery.auto_discover = parse_bool(key, value)?,
"discovery.interval_secs" => self.discovery.interval_secs = parse_value(key, value)?,
"discovery.timeout_secs" => self.discovery.timeout_secs = parse_value(key, value)?,
@@ -311,6 +315,13 @@ where
.map_err(|error| anyhow::anyhow!("invalid value '{value}' for {key}: {error}"))
}
fn validate_log_level(value: &str) -> anyhow::Result<()> {
EnvFilter::try_new(value).map_err(|error| {
anyhow::anyhow!("invalid value '{value}' for daemon.log_level: {error}")
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -348,4 +359,18 @@ mod tests {
.expect("config should load");
assert_eq!(loaded, config);
}
#[test]
fn rejects_invalid_log_level_values() {
let mut config = TvctlConfig::default();
let error = config
.set_value("daemon.log_level", "[")
.expect_err("invalid log level should fail");
assert!(
error
.to_string()
.contains("invalid value '[' for daemon.log_level"),
"unexpected error: {error}"
);
}
}
+20 -7
View File
@@ -36,6 +36,7 @@ use std::os::unix::fs::PermissionsExt;
use crate::adapters::{Device, TvKey};
use crate::api;
use crate::logging;
pub type SharedDaemon = Arc<Mutex<Daemon>>;
@@ -62,6 +63,7 @@ impl Daemon {
/// Load the daemon's persisted state and adapter registry.
pub async fn load() -> anyhow::Result<Self> {
let config = TvctlConfig::load().await?;
logging::apply_log_level(&config.daemon.log_level).map_err(anyhow::Error::msg)?;
let mut paths = RuntimePaths::detect();
if !config.daemon.socket.is_empty() {
paths.socket_file = PathBuf::from(&config.daemon.socket);
@@ -783,16 +785,26 @@ pub(crate) async fn execute_request(
DaemonRequest::ReloadConfig => {
let mut guard = daemon.lock().await;
match TvctlConfig::load_from_path(&guard.paths.config_file).await {
Ok(config) => {
let restart_required = apply_runtime_config(&mut guard, config);
(
Ok(config) => match apply_runtime_config(&mut guard, config) {
Ok(restart_required) => (
DaemonResponse::success(ConfigReloadResult {
config: guard.config.clone(),
restart_required,
}),
false,
)
}
),
Err(error) => (
DaemonResponse::error(
"config_reload_failed",
format!("Failed to apply reloaded config: {error}"),
Some(
"Inspect ~/.config/tvctl/config.toml and fix invalid values."
.to_string(),
),
),
false,
),
},
Err(error) => (
DaemonResponse::error(
"config_reload_failed",
@@ -943,8 +955,9 @@ fn discovery_interval(interval_secs: u64) -> Option<time::Interval> {
Some(interval)
}
fn apply_runtime_config(daemon: &mut Daemon, config: TvctlConfig) -> Vec<String> {
fn apply_runtime_config(daemon: &mut Daemon, config: TvctlConfig) -> anyhow::Result<Vec<String>> {
let old_config = daemon.config.clone();
logging::apply_log_level(&config.daemon.log_level).map_err(anyhow::Error::msg)?;
daemon.adapters = AdapterRegistry::from_config(&config);
daemon.discovery = DiscoveryService::new(daemon.adapters.clone());
@@ -972,5 +985,5 @@ fn apply_runtime_config(daemon: &mut Daemon, config: TvctlConfig) -> Vec<String>
}
daemon.config = config;
restart_required
Ok(restart_required)
}
+2 -2
View File
@@ -294,7 +294,7 @@ impl AdapterRegistry {
F: for<'a> FnOnce(
&'a RokuAdapter,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = crate::adapters::Result<T>> + 'a>,
Box<dyn std::future::Future<Output = crate::adapters::Result<T>> + Send + 'a>,
>,
{
match platform {
@@ -309,7 +309,7 @@ impl AdapterRegistry {
&'a RokuAdapter,
&'a Device,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = crate::adapters::Result<T>> + 'a>,
Box<dyn std::future::Future<Output = crate::adapters::Result<T>> + Send + 'a>,
>,
{
match device.platform.as_str() {
+1
View File
@@ -4,3 +4,4 @@ pub mod adapters;
pub mod api;
pub mod cli;
pub mod daemon;
pub mod logging;
+45
View File
@@ -0,0 +1,45 @@
use std::sync::OnceLock;
use tracing_subscriber::{
EnvFilter, Registry, fmt, layer::SubscriberExt, reload, util::SubscriberInitExt,
};
type FilterReloadHandle = reload::Handle<EnvFilter, Registry>;
static FILTER_RELOAD_HANDLE: OnceLock<FilterReloadHandle> = OnceLock::new();
/// Initialize global tracing with a runtime-reloadable filter.
pub fn init(default_level: &str) -> anyhow::Result<()> {
let filter = if std::env::var("RUST_LOG").is_ok() {
EnvFilter::try_from_default_env().map_err(|error| {
anyhow::anyhow!("failed to parse RUST_LOG env filter '{default_level}': {error}")
})?
} else {
parse_filter(default_level).map_err(anyhow::Error::msg)?
};
let (filter_layer, handle) = reload::Layer::new(filter);
let _ = FILTER_RELOAD_HANDLE.set(handle);
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt::layer().with_target(false).compact())
.try_init()
.map_err(|error| anyhow::anyhow!("failed to initialize tracing: {error}"))
}
/// Apply a new process log-level filter at runtime.
pub fn apply_log_level(level: &str) -> Result<(), String> {
let filter = parse_filter(level)?;
let handle = FILTER_RELOAD_HANDLE
.get()
.ok_or_else(|| "logging subsystem is not initialized".to_string())?;
handle
.modify(|current_filter| *current_filter = filter)
.map_err(|error| format!("failed to apply log filter: {error}"))
}
fn parse_filter(value: &str) -> Result<EnvFilter, String> {
EnvFilter::try_new(value)
.map_err(|error| format!("invalid value '{value}' for daemon.log_level: {error}"))
}
+4 -8
View File
@@ -1,14 +1,10 @@
/// Launch the tvctl binary entry point.
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.with_target(false)
.compact()
.init();
if let Err(error) = tvctl::logging::init("info") {
eprintln!("Failed to initialize logging: {error}");
std::process::exit(1);
}
if let Err(error) = tvctl::cli::run().await {
eprintln!("{error}");