feat: add HTTP API and integration coverage

Expose the daemon request surface over /v1 with Axum, reuse shared key
parsing between CLI and HTTP, and add an isolated end-to-end HTTP test
that boots a real daemon process with temp XDG paths.
This commit is contained in:
44r0n7
2026-04-15 15:40:50 -04:00
parent 45620b1ab5
commit b8a0a0ff16
9 changed files with 865 additions and 68 deletions
+43
View File
@@ -202,3 +202,46 @@ pub enum TvError {
#[error("i/o error: {0}")]
Io(#[from] std::io::Error),
}
/// Parse a normalized TV key name used by the CLI and HTTP API.
pub fn parse_normalized_tv_key(input: &str) -> Result<TvKey> {
if let Some(literal) = input.strip_prefix("literal:") {
return Ok(TvKey::Literal(literal.to_string()));
}
match input.to_ascii_lowercase().as_str() {
"home" => Ok(TvKey::Home),
"back" => Ok(TvKey::Back),
"up" => Ok(TvKey::Up),
"down" => Ok(TvKey::Down),
"left" => Ok(TvKey::Left),
"right" => Ok(TvKey::Right),
"select" => Ok(TvKey::Select),
"play" => Ok(TvKey::Play),
"pause" => Ok(TvKey::Pause),
"play-pause" => Ok(TvKey::PlayPause),
"stop" => Ok(TvKey::Stop),
"rewind" => Ok(TvKey::Rewind),
"fast-forward" => Ok(TvKey::FastForward),
"replay" => Ok(TvKey::Replay),
"skip" => Ok(TvKey::Skip),
"channel-up" => Ok(TvKey::ChannelUp),
"channel-down" => Ok(TvKey::ChannelDown),
"volume-up" => Ok(TvKey::VolumeUp),
"volume-down" => Ok(TvKey::VolumeDown),
"mute" => Ok(TvKey::Mute),
"power" => Ok(TvKey::Power),
"power-on" => Ok(TvKey::PowerOn),
"power-off" => Ok(TvKey::PowerOff),
"input-hdmi1" => Ok(TvKey::InputHdmi1),
"input-hdmi2" => Ok(TvKey::InputHdmi2),
"input-hdmi3" => Ok(TvKey::InputHdmi3),
"input-hdmi4" => Ok(TvKey::InputHdmi4),
"input-av" => Ok(TvKey::InputAv),
"input-tuner" => Ok(TvKey::InputTuner),
"search" => Ok(TvKey::Search),
"info" => Ok(TvKey::Info),
"options" => Ok(TvKey::Options),
_ => Err(TvError::InvalidKey(input.to_string())),
}
}
+511 -11
View File
@@ -1,37 +1,537 @@
use axum::Router;
use serde::Serialize;
use std::collections::BTreeMap;
/// Create the placeholder HTTP router for the tvctl API.
pub fn router() -> Router {
use axum::{
Json, Router,
extract::{Multipart, Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, post},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
fs,
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
};
use crate::{
adapters::{Device, TvKey, parse_normalized_tv_key},
daemon::{
SharedDaemon, config::TvctlConfig,
ipc::{
AppListResult, ConfigReloadResult, DaemonRequest, DaemonResponse, DaemonStatus,
DevLogsResult, DiscoveryResult, StateResult,
},
},
};
/// Create the HTTP router for the tvctl API.
pub fn router(daemon: SharedDaemon) -> Router {
Router::new()
.route("/v1/devices", get(list_devices))
.route("/v1/devices/discover", post(discover_devices))
.route("/v1/devices/{id}", get(get_device).delete(delete_device))
.route("/v1/devices/{id}/state", get(get_state))
.route("/v1/devices/{id}/apps", get(list_apps))
.route("/v1/devices/{id}/apps/launch", post(launch_app))
.route("/v1/devices/{id}/apps/stop", post(stop_app))
.route("/v1/devices/{id}/apps/refresh", post(refresh_apps))
.route("/v1/devices/{id}/remote/key", post(send_key))
.route("/v1/devices/{id}/remote/sequence", post(send_sequence))
.route("/v1/devices/{id}/dev/install", post(dev_install))
.route("/v1/devices/{id}/dev/reload", post(dev_reload))
.route("/v1/devices/{id}/dev/logs", get(dev_logs))
.route("/v1/daemon/status", get(daemon_status))
.route("/v1/config", get(get_config).patch(patch_config))
.route("/v1/config/reload", post(reload_config))
.with_state(daemon)
}
/// The standard success envelope for API responses.
#[derive(Debug, Clone, Serialize)]
pub struct SuccessEnvelope<T> {
/// Indicates a successful operation.
pub ok: bool,
/// The payload returned by the request.
pub data: T,
}
/// The standard error envelope for API responses.
#[derive(Debug, Clone, Serialize)]
pub struct ErrorEnvelope {
/// Indicates the request failed.
pub ok: bool,
/// The structured error payload.
pub error: ApiError,
}
/// A machine-readable API error returned to clients.
#[derive(Debug, Clone, Serialize)]
pub struct ApiError {
/// Stable, snake_case error identifier.
pub code: String,
/// Human-readable summary of the failure.
pub message: String,
/// Suggested next action for the caller.
#[serde(skip_serializing_if = "Option::is_none")]
pub hint: Option<String>,
}
#[derive(Debug, Deserialize)]
struct LaunchAppBody {
app: String,
}
#[derive(Debug, Deserialize)]
struct RefreshAppsBody {
#[serde(default)]
clear: bool,
}
#[derive(Debug, Deserialize)]
struct SendKeyBody {
key: String,
}
#[derive(Debug, Deserialize)]
struct SendSequenceBody {
keys: Vec<String>,
#[serde(default = "default_remote_delay_ms")]
delay_ms: u64,
}
fn default_remote_delay_ms() -> u64 {
200
}
async fn list_devices(State(daemon): State<SharedDaemon>) -> Response {
execute_json::<Vec<Device>>(daemon, DaemonRequest::ListDevices).await
}
async fn discover_devices(State(daemon): State<SharedDaemon>) -> Response {
execute_json::<DiscoveryResult>(daemon, DaemonRequest::Discover).await
}
async fn get_device(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json::<Device>(daemon, DaemonRequest::GetDevice { target: id }).await
}
async fn delete_device(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json::<Device>(daemon, DaemonRequest::RemoveDevice { target: id }).await
}
async fn get_state(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json::<StateResult>(
daemon,
DaemonRequest::GetState {
device: Some(id),
},
)
.await
}
async fn list_apps(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json::<AppListResult>(
daemon,
DaemonRequest::ListApps {
device: Some(id),
platform: None,
},
)
.await
}
async fn launch_app(
Path(id): Path<String>,
State(daemon): State<SharedDaemon>,
Json(body): Json<LaunchAppBody>,
) -> Response {
execute_json_value(
daemon,
DaemonRequest::LaunchApp {
device: Some(id),
app: body.app,
},
)
.await
}
async fn stop_app(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json_value(
daemon,
DaemonRequest::StopApp {
device: Some(id),
},
)
.await
}
async fn refresh_apps(
Path(id): Path<String>,
State(daemon): State<SharedDaemon>,
body: Option<Json<RefreshAppsBody>>,
) -> Response {
execute_json_value(
daemon,
DaemonRequest::RefreshApps {
device: Some(id),
clear: body.map(|value| value.clear).unwrap_or(false),
},
)
.await
}
async fn send_key(
Path(id): Path<String>,
State(daemon): State<SharedDaemon>,
Json(body): Json<SendKeyBody>,
) -> Response {
let key = match parse_key(&body.key) {
Ok(key) => key,
Err(response) => return response,
};
execute_json_value(
daemon,
DaemonRequest::SendKey {
device: Some(id),
key,
},
)
.await
}
async fn send_sequence(
Path(id): Path<String>,
State(daemon): State<SharedDaemon>,
Json(body): Json<SendSequenceBody>,
) -> Response {
let mut parsed = Vec::with_capacity(body.keys.len());
for key in body.keys {
match parse_key(&key) {
Ok(key) => parsed.push(key),
Err(response) => return response,
}
}
execute_json_value(
daemon,
DaemonRequest::SendSequence {
device: Some(id),
keys: parsed,
delay_ms: body.delay_ms,
},
)
.await
}
async fn dev_install(
Path(id): Path<String>,
State(daemon): State<SharedDaemon>,
mut multipart: Multipart,
) -> Response {
let mut archive = None;
while let Ok(Some(field)) = multipart.next_field().await {
let name = field.name().unwrap_or_default().to_string();
if name == "archive" || archive.is_none() {
match field.bytes().await {
Ok(bytes) => {
archive = Some(bytes);
if name == "archive" {
break;
}
}
Err(error) => {
return api_error(
StatusCode::BAD_REQUEST,
"invalid_multipart",
format!("Failed to read uploaded archive: {error}"),
Some("Upload a valid zip file as multipart field `archive`.".to_string()),
);
}
}
}
}
let Some(archive) = archive else {
return api_error(
StatusCode::BAD_REQUEST,
"missing_archive",
"Missing multipart field `archive`.",
Some("Upload a zip file in multipart field `archive`.".to_string()),
);
};
let zip_path = std::env::temp_dir().join(format!("tvctl-http-{}.zip", uuid::Uuid::new_v4()));
if let Err(error) = fs::write(&zip_path, &archive).await {
return api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"dev_upload_write_failed",
format!("Failed to stage uploaded archive: {error}"),
Some("Check temp directory permissions and retry.".to_string()),
);
}
let response = execute_json_value(
daemon,
DaemonRequest::DevInstall {
device: Some(id),
zip_path: zip_path.display().to_string(),
},
)
.await;
let _ = fs::remove_file(&zip_path).await;
response
}
async fn dev_reload(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json_value(
daemon,
DaemonRequest::DevReload {
device: Some(id),
},
)
.await
}
async fn dev_logs(Path(id): Path<String>, State(daemon): State<SharedDaemon>) -> Response {
execute_json::<DevLogsResult>(
daemon,
DaemonRequest::DevLogs {
device: Some(id),
},
)
.await
}
async fn daemon_status(State(daemon): State<SharedDaemon>) -> Response {
execute_json::<DaemonStatus>(daemon, DaemonRequest::Ping).await
}
async fn get_config() -> Response {
match TvctlConfig::load().await {
Ok(mut config) => {
if !config.dev.roku_password.is_empty() {
config.dev.roku_password = "<redacted>".to_string();
}
api_success(StatusCode::OK, config)
}
Err(error) => api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"config_load_failed",
format!("Failed to load tvctl config: {error}"),
Some("Inspect ~/.config/tvctl/config.toml for invalid TOML.".to_string()),
),
}
}
async fn patch_config(
State(daemon): State<SharedDaemon>,
Json(body): Json<BTreeMap<String, Value>>,
) -> Response {
let path = crate::daemon::config::default_config_path();
let mut config = match TvctlConfig::load_from_path(&path).await {
Ok(config) => config,
Err(error) => {
return api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"config_load_failed",
format!("Failed to load tvctl config: {error}"),
Some("Inspect ~/.config/tvctl/config.toml for invalid TOML.".to_string()),
);
}
};
for (key, value) in body {
let value = match json_value_to_string(value) {
Some(value) => value,
None => {
return api_error(
StatusCode::BAD_REQUEST,
"invalid_config_value",
format!("Config value for '{key}' must be a string, number, boolean, or null."),
Some("Use flat key/value JSON such as {\"daemon.http_port\": 7272}.".to_string()),
);
}
};
if let Err(error) = config.set_value(&key, &value) {
return api_error(
StatusCode::BAD_REQUEST,
"invalid_config_patch",
format!("Failed to set config value: {error}"),
Some("Run `tvctl config list` to see supported keys and value shapes.".to_string()),
);
}
}
if let Err(error) = config.save_to_path(&path).await {
return api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"config_save_failed",
format!("Failed to save tvctl config: {error}"),
Some("Check write permissions for ~/.config/tvctl/config.toml.".to_string()),
);
}
let response = execute_json_value(daemon, DaemonRequest::ReloadConfig).await;
response
}
async fn reload_config(State(daemon): State<SharedDaemon>) -> Response {
execute_json::<ConfigReloadResult>(daemon, DaemonRequest::ReloadConfig).await
}
async fn execute_json<T>(daemon: SharedDaemon, request: DaemonRequest) -> Response
where
T: Serialize + DeserializeOwned,
{
match send_daemon_request(daemon, &request).await {
Ok(response) => from_daemon_response::<T>(response),
Err(response) => response,
}
}
async fn execute_json_value(daemon: SharedDaemon, request: DaemonRequest) -> Response {
match send_daemon_request(daemon, &request).await {
Ok(response) => from_daemon_response::<Value>(response),
Err(response) => response,
}
}
fn from_daemon_response<T>(response: DaemonResponse) -> Response
where
T: Serialize + DeserializeOwned,
{
if let Some(error) = response.error {
return api_error(status_for_error(&error.code), error.code, error.message, error.hint);
}
let data = response.data.unwrap_or(Value::Null);
match serde_json::from_value::<T>(data) {
Ok(data) => api_success(StatusCode::OK, data),
Err(error) => api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"invalid_daemon_payload",
format!("Failed to decode daemon payload: {error}"),
Some("Ensure the HTTP API and daemon are from the same build.".to_string()),
),
}
}
fn api_success<T>(status: StatusCode, data: T) -> Response
where
T: Serialize,
{
(status, Json(SuccessEnvelope { ok: true, data })).into_response()
}
fn api_error(
status: StatusCode,
code: impl Into<String>,
message: impl Into<String>,
hint: Option<String>,
) -> Response {
(
status,
Json(ErrorEnvelope {
ok: false,
error: ApiError {
code: code.into(),
message: message.into(),
hint,
},
}),
)
.into_response()
}
fn parse_key(input: &str) -> Result<TvKey, Response> {
parse_normalized_tv_key(input).map_err(|_| {
api_error(
StatusCode::BAD_REQUEST,
"invalid_key",
format!("Unknown key '{input}'."),
Some("Use a normalized key like `home`, `down`, `volume-up`, or `literal:text`.".to_string()),
)
})
}
fn json_value_to_string(value: Value) -> Option<String> {
match value {
Value::Null => Some(String::new()),
Value::Bool(value) => Some(value.to_string()),
Value::Number(value) => Some(value.to_string()),
Value::String(value) => Some(value),
_ => None,
}
}
fn status_for_error(code: &str) -> StatusCode {
match code {
"device_not_found" | "no_default_device" => StatusCode::NOT_FOUND,
"invalid_request"
| "platform_mismatch"
| "app_launch_ambiguous"
| "invalid_key"
| "invalid_config_patch"
| "invalid_config_value"
| "missing_archive"
| "invalid_multipart" => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
async fn send_daemon_request(
daemon: SharedDaemon,
request: &DaemonRequest,
) -> Result<DaemonResponse, Response> {
let socket_path = {
let guard = daemon.lock().await;
guard.paths.socket_file.clone()
};
let mut stream = UnixStream::connect(&socket_path).await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_socket_unreachable",
format!("Unable to reach tvctld at {}: {error}", socket_path.display()),
Some("Check whether the daemon socket is writable and the daemon is running.".to_string()),
)
})?;
let bytes = serde_json::to_vec(request).map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"request_encode_failed",
format!("Failed to encode daemon request: {error}"),
Some("Ensure the HTTP API and daemon are from the same build.".to_string()),
)
})?;
stream.write_all(&bytes).await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_write_failed",
format!("Failed to write request to tvctld: {error}"),
Some("Retry the request after checking the daemon state.".to_string()),
)
})?;
stream.shutdown().await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_shutdown_failed",
format!("Failed to finish the daemon request: {error}"),
Some("Retry the request after restarting the daemon.".to_string()),
)
})?;
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes).await.map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_read_failed",
format!("Failed to read the daemon response: {error}"),
Some("Retry the request after restarting the daemon.".to_string()),
)
})?;
serde_json::from_slice::<DaemonResponse>(&response_bytes).map_err(|error| {
api_error(
StatusCode::INTERNAL_SERVER_ERROR,
"daemon_response_invalid",
format!("Failed to decode the daemon response: {error}"),
Some("Ensure the HTTP API and daemon are from the same build.".to_string()),
)
})
}
+5 -41
View File
@@ -12,7 +12,7 @@ use tokio::{
};
use crate::{
adapters::{AppInfo, Device, DeviceState, TvKey},
adapters::{AppInfo, Device, DeviceState, TvKey, parse_normalized_tv_key},
daemon::{
self,
config::{RuntimePaths, TvctlConfig, default_config_path, systemd_unit_path},
@@ -1409,46 +1409,10 @@ fn is_secret_config_key(key: &str) -> bool {
}
fn parse_tv_key(input: &str) -> Result<TvKey, CliError> {
if let Some(literal) = input.strip_prefix("literal:") {
return Ok(TvKey::Literal(literal.to_string()));
}
match input.to_ascii_lowercase().as_str() {
"home" => Ok(TvKey::Home),
"back" => Ok(TvKey::Back),
"up" => Ok(TvKey::Up),
"down" => Ok(TvKey::Down),
"left" => Ok(TvKey::Left),
"right" => Ok(TvKey::Right),
"select" => Ok(TvKey::Select),
"play" => Ok(TvKey::Play),
"pause" => Ok(TvKey::Pause),
"play-pause" => Ok(TvKey::PlayPause),
"stop" => Ok(TvKey::Stop),
"rewind" => Ok(TvKey::Rewind),
"fast-forward" => Ok(TvKey::FastForward),
"replay" => Ok(TvKey::Replay),
"skip" => Ok(TvKey::Skip),
"channel-up" => Ok(TvKey::ChannelUp),
"channel-down" => Ok(TvKey::ChannelDown),
"volume-up" => Ok(TvKey::VolumeUp),
"volume-down" => Ok(TvKey::VolumeDown),
"mute" => Ok(TvKey::Mute),
"power" => Ok(TvKey::Power),
"power-on" => Ok(TvKey::PowerOn),
"power-off" => Ok(TvKey::PowerOff),
"input-hdmi1" => Ok(TvKey::InputHdmi1),
"input-hdmi2" => Ok(TvKey::InputHdmi2),
"input-hdmi3" => Ok(TvKey::InputHdmi3),
"input-hdmi4" => Ok(TvKey::InputHdmi4),
"input-av" => Ok(TvKey::InputAv),
"input-tuner" => Ok(TvKey::InputTuner),
"search" => Ok(TvKey::Search),
"info" => Ok(TvKey::Info),
"options" => Ok(TvKey::Options),
_ => Err(CliError::new(
parse_normalized_tv_key(input).map_err(|_| {
CliError::new(
format!("Unknown key '{input}'."),
"Use a normalized key like `home`, `down`, `volume-up`, or `literal:text`.",
)),
}
)
})
}
+38 -4
View File
@@ -6,6 +6,7 @@ pub mod registry;
pub mod state;
use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
@@ -25,6 +26,7 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{UnixListener, UnixStream},
sync::Mutex,
task::JoinHandle,
time::{self, MissedTickBehavior, sleep},
};
use tracing::warn;
@@ -33,6 +35,9 @@ use tracing::warn;
use std::os::unix::fs::PermissionsExt;
use crate::adapters::{Device, TvKey};
use crate::api;
pub type SharedDaemon = Arc<Mutex<Daemon>>;
/// The long-lived tvctld process.
#[derive(Debug)]
@@ -85,7 +90,7 @@ impl Daemon {
/// Run the long-lived daemon loop over a Unix socket.
pub async fn serve() -> anyhow::Result<()> {
let daemon = Arc::new(Mutex::new(Daemon::load().await?));
let daemon: SharedDaemon = Arc::new(Mutex::new(Daemon::load().await?));
{
let mut guard = daemon.lock().await;
@@ -101,6 +106,7 @@ pub async fn serve() -> anyhow::Result<()> {
guard.config.discovery.interval_secs,
)
};
let http_server = start_http_server_if_enabled(daemon.clone()).await?;
if let Some(parent) = socket_path.parent() {
fs::create_dir_all(parent).await?;
@@ -163,6 +169,9 @@ pub async fn serve() -> anyhow::Result<()> {
let guard = daemon.lock().await;
let _ = fs::remove_file(&guard.paths.active_socket_file).await;
}
if let Some(task) = http_server {
task.abort();
}
Ok(())
}
@@ -174,6 +183,31 @@ async fn set_socket_permissions(path: &Path) -> anyhow::Result<()> {
Ok(())
}
async fn start_http_server_if_enabled(daemon: SharedDaemon) -> anyhow::Result<Option<JoinHandle<()>>> {
let (enabled, host, port) = {
let guard = daemon.lock().await;
(
guard.config.daemon.http_enabled,
guard.config.daemon.http_host.clone(),
guard.config.daemon.http_port,
)
};
if !enabled {
return Ok(None);
}
let address: SocketAddr = format!("{host}:{port}").parse()?;
let listener = tokio::net::TcpListener::bind(address).await?;
let app = api::router(daemon);
let task = tokio::spawn(async move {
if let Err(error) = axum::serve(listener, app).await {
warn!("HTTP API server stopped: {error}");
}
});
Ok(Some(task))
}
async fn handle_connection(
mut stream: UnixStream,
daemon: Arc<Mutex<Daemon>>,
@@ -194,7 +228,7 @@ async fn handle_connection(
}
};
let (response, should_stop) = handle_request(request, daemon).await;
let (response, should_stop) = execute_request(request, daemon).await;
write_response(&mut stream, &response).await?;
Ok(should_stop)
}
@@ -206,9 +240,9 @@ async fn write_response(stream: &mut UnixStream, response: &DaemonResponse) -> a
Ok(())
}
async fn handle_request(
pub(crate) async fn execute_request(
request: DaemonRequest,
daemon: Arc<Mutex<Daemon>>,
daemon: SharedDaemon,
) -> (DaemonResponse, bool) {
match request {
DaemonRequest::Ping => {