From 2e55677e35c06495dfbeacf0d0f96178e4f79ee9 Mon Sep 17 00:00:00 2001 From: 44r0n7 <44r0n7+gitea@pm.me> Date: Thu, 1 Jan 2026 17:29:19 -0500 Subject: [PATCH] ux: show fix phase progress and scan/plan output --- vid-repair-core/src/fix/executor.rs | 181 +++++++++++++++++++++++-- vid-repair/src/main.rs | 202 ++++++++++++++++++++++------ 2 files changed, 331 insertions(+), 52 deletions(-) diff --git a/vid-repair-core/src/fix/executor.rs b/vid-repair-core/src/fix/executor.rs index 295ea84..9352127 100644 --- a/vid-repair-core/src/fix/executor.rs +++ b/vid-repair-core/src/fix/executor.rs @@ -1,15 +1,31 @@ +use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; -use std::process::Command; +use std::process::{Command, Stdio}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use anyhow::{Context, Result}; use fs_err as fs; use crate::config::Config; use crate::fix::{FixKind, FixOutcome, FixPlan}; -use crate::scan::scan_file; +use crate::scan::{scan_file, DecodeProgress}; use crate::rules::RuleSet; pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet) -> Result { + apply_fix_with_progress(path, plan, config, ruleset, None, None) +} + +pub fn apply_fix_with_progress( + path: &Path, + plan: &FixPlan, + config: &Config, + ruleset: &RuleSet, + duration: Option, + progress: Option>, +) -> Result { if plan.actions.is_empty() { return Ok(FixOutcome { plan: plan.clone(), @@ -27,7 +43,7 @@ pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet let action = &plan.actions[0]; let output = prepare_output_path(path, config)?; - run_ffmpeg_fix(path, &output.temp_path, action.kind, config)?; + run_ffmpeg_fix(path, &output.temp_path, action.kind, config, duration, progress)?; let verification = scan_file(&output.temp_path, config, ruleset) .with_context(|| format!("Failed to verify output {}", output.temp_path.display()))?; @@ -55,7 +71,14 @@ pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet }) } -fn run_ffmpeg_fix(path: &Path, output: &Path, kind: FixKind, config: &Config) -> Result<()> { +fn run_ffmpeg_fix( + path: &Path, + output: &Path, + kind: FixKind, + config: &Config, + duration: Option, + progress: Option>, +) -> Result<()> { let mut cmd = Command::new(&config.ffmpeg_path); cmd.arg("-y").arg("-v").arg("error").arg("-i").arg(path); @@ -76,13 +99,118 @@ fn run_ffmpeg_fix(path: &Path, output: &Path, kind: FixKind, config: &Config) -> cmd.arg(output); - let output = cmd - .output() + if progress.is_none() { + let output = cmd + .output() + .with_context(|| format!("Failed to run ffmpeg fix for {}", path.display()))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("ffmpeg fix failed: {}", stderr.trim()); + } + + return Ok(()); + } + + cmd.arg("-nostats").arg("-progress").arg("pipe:2"); + cmd.stdout(Stdio::null()).stderr(Stdio::piped()); + + let mut child = cmd + .spawn() .with_context(|| format!("Failed to run ffmpeg fix for {}", path.display()))?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("ffmpeg fix failed: {}", stderr.trim()); + let stderr = child + .stderr + .take() + .context("Failed to capture ffmpeg stderr")?; + let reader = BufReader::new(stderr); + + let progress_done = Arc::new(AtomicBool::new(false)); + let progress_done_flag = progress_done.clone(); + let snapshot = Arc::new(Mutex::new(ProgressSnapshot::default())); + let snapshot_thread = snapshot.clone(); + let progress_cb = progress.clone().unwrap(); + + let mut out_time: Option = None; + let mut speed: Option = None; + let mut error_lines = Vec::new(); + + for line in reader.lines() { + let line = match line { + Ok(line) => line, + Err(_) => break, + }; + if line.is_empty() { + continue; + } + + if let Some((key, value)) = line.split_once('=') { + match key { + "out_time_ms" => { + out_time = value.parse::().ok().map(|ms| ms / 1_000_000.0); + } + "out_time_us" => { + out_time = value.parse::().ok().map(|us| us / 1_000_000.0); + } + "out_time" => { + out_time = parse_out_time(value).or(out_time); + } + "speed" => { + speed = parse_speed(value); + } + "progress" => { + let done = value == "end"; + let percent = duration.and_then(|d| out_time.map(|t| (t / d * 100.0).min(100.0))); + if let Ok(mut snap) = snapshot_thread.lock() { + snap.out_time = out_time; + snap.speed = speed; + } + progress_cb(DecodeProgress { + out_time, + duration, + percent, + speed, + done, + }); + if done { + progress_done_flag.store(true, Ordering::SeqCst); + } + } + _ => {} + } + } else { + error_lines.push(line); + } + } + + let status = child + .wait() + .with_context(|| format!("Failed to wait for ffmpeg fix for {}", path.display()))?; + + if !progress_done.load(Ordering::SeqCst) { + let snapshot = snapshot.lock().ok(); + let out_time = snapshot.as_ref().and_then(|s| s.out_time); + let speed = snapshot.as_ref().and_then(|s| s.speed); + let percent = duration.map(|_| 100.0); + progress_cb(DecodeProgress { + out_time, + duration, + percent, + speed, + done: true, + }); + } + + if !status.success() { + let message = error_lines.join(" "); + anyhow::bail!( + "ffmpeg fix failed: {}", + if message.trim().is_empty() { + "unknown error" + } else { + message.trim() + } + ); } Ok(()) @@ -208,3 +336,38 @@ fn next_original_path(path: &Path) -> Result { anyhow::bail!("Unable to find available .original name for {}", path.display()); } + +#[derive(Default)] +struct ProgressSnapshot { + out_time: Option, + speed: Option, +} + +fn parse_speed(value: &str) -> Option { + value.trim().trim_end_matches('x').parse::().ok() +} + +fn parse_out_time(value: &str) -> Option { + let trimmed = value.trim(); + if trimmed.is_empty() { + return None; + } + if let Ok(seconds) = trimmed.parse::() { + return Some(seconds); + } + + let parts: Vec<&str> = trimmed.split(':').collect(); + if parts.len() == 3 { + let hours = parts[0].parse::().ok()?; + let minutes = parts[1].parse::().ok()?; + let seconds = parts[2].parse::().ok()?; + return Some(hours * 3600.0 + minutes * 60.0 + seconds); + } + if parts.len() == 2 { + let minutes = parts[0].parse::().ok()?; + let seconds = parts[1].parse::().ok()?; + return Some(minutes * 60.0 + seconds); + } + + None +} diff --git a/vid-repair/src/main.rs b/vid-repair/src/main.rs index e57f808..cc303ed 100644 --- a/vid-repair/src/main.rs +++ b/vid-repair/src/main.rs @@ -9,12 +9,12 @@ use rayon::prelude::*; use rayon::ThreadPoolBuilder; use vid_repair_core::config::{Config, ConfigOverrides, FixPolicy, ScanDepth}; -use vid_repair_core::fix::{self, FixOutcome}; +use vid_repair_core::fix::{self, FixOutcome, FixPlan}; use vid_repair_core::report::{ render_fix_line, render_json, render_scan_line, render_summary, FixJsonReport, ScanJsonReport, SCHEMA_VERSION, }; -use vid_repair_core::rules::{ensure_ruleset_loaded, RuleSet}; +use vid_repair_core::rules::{ensure_ruleset_loaded, RuleSet, Severity}; use vid_repair_core::scan::{scan_file_with_progress, DecodeProgress, ScanOutcome}; use vid_repair_core::{fs, watch}; @@ -542,17 +542,17 @@ fn process_fix_batch( for path in files { idx += 1; if show_progress && !in_place { - eprintln!("[FIX {}/{}] {}", idx, total, path.display()); + eprintln!("[SCAN {}/{}] {}", idx, total, path.display()); } else if in_place { - eprint!("[FIX {}/{}] {}", idx, total, path.display()); + eprint!("[SCAN {}/{}] {}", idx, total, path.display()); let _ = std::io::stderr().flush(); } - let progress = if show_progress { - Some(make_progress_callback("FIX", idx, total, &path, in_place)) + let scan_progress = if show_progress { + Some(make_progress_callback("SCAN", idx, total, &path, in_place)) } else { None }; - let scan = match scan_file_with_progress(&path, config, ruleset, progress) { + let scan = match scan_file_with_progress(&path, config, ruleset, scan_progress) { Ok(scan) => scan, Err(err) => { if in_place { @@ -563,20 +563,48 @@ fn process_fix_batch( continue; } }; + if show_progress { + eprintln!("{}", render_scan_result_line(&scan)); + } let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy); + if show_progress { + eprintln!("{}", render_plan_line(&plan)); + } let outcome = if dry_run { fix::planner::plan_outcome(plan) } else { - match fix::executor::apply_fix(&path, &plan, config, ruleset) { - Ok(outcome) => outcome, - Err(err) => FixOutcome { - plan: plan.clone(), - applied: true, - success: false, - message: format!("Fix failed: {}", err), - output_path: None, - re_scan_required: true, - }, + if plan.actions.is_empty() { + fix::planner::plan_outcome(plan) + } else { + if show_progress && !in_place { + eprintln!("[FIX {}/{}] {}", idx, total, path.display()); + } else if in_place { + eprint!("[FIX {}/{}] {}", idx, total, path.display()); + let _ = std::io::stderr().flush(); + } + let fix_progress = if show_progress { + Some(make_progress_callback("FIX", idx, total, &path, in_place)) + } else { + None + }; + match fix::executor::apply_fix_with_progress( + &path, + &plan, + config, + ruleset, + scan.probe.duration, + fix_progress, + ) { + Ok(outcome) => outcome, + Err(err) => FixOutcome { + plan: plan.clone(), + applied: true, + success: false, + message: format!("Fix failed: {}", err), + output_path: None, + re_scan_required: true, + }, + } } }; scans.push(scan); @@ -601,14 +629,14 @@ fn process_fix_batch_parallel( .filter_map(|path| { let idx = started.fetch_add(1, Ordering::SeqCst) + 1; if show_progress { - eprintln!("[FIX {}/{}] {}", idx, total, path.display()); + eprintln!("[SCAN {}/{}] {}", idx, total, path.display()); } - let progress = if show_progress { - Some(make_progress_callback("FIX", idx, total, path, false)) + let scan_progress = if show_progress { + Some(make_progress_callback("SCAN", idx, total, path, false)) } else { None }; - let scan = match scan_file_with_progress(path, config, ruleset, progress) { + let scan = match scan_file_with_progress(path, config, ruleset, scan_progress) { Ok(scan) => scan, Err(err) => { eprintln!("[ERROR] {}: {}", path.display(), err); @@ -616,20 +644,45 @@ fn process_fix_batch_parallel( return None; } }; + if show_progress { + eprintln!("{}", render_scan_result_line(&scan)); + } let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy); + if show_progress { + eprintln!("{}", render_plan_line(&plan)); + } let outcome = if dry_run { fix::planner::plan_outcome(plan) } else { - match fix::executor::apply_fix(path, &plan, config, ruleset) { - Ok(outcome) => outcome, - Err(err) => FixOutcome { - plan: plan.clone(), - applied: true, - success: false, - message: format!("Fix failed: {}", err), - output_path: None, - re_scan_required: true, - }, + if plan.actions.is_empty() { + fix::planner::plan_outcome(plan) + } else { + if show_progress { + eprintln!("[FIX {}/{}] {}", idx, total, path.display()); + } + let fix_progress = if show_progress { + Some(make_progress_callback("FIX", idx, total, path, false)) + } else { + None + }; + match fix::executor::apply_fix_with_progress( + path, + &plan, + config, + ruleset, + scan.probe.duration, + fix_progress, + ) { + Ok(outcome) => outcome, + Err(err) => FixOutcome { + plan: plan.clone(), + applied: true, + success: false, + message: format!("Fix failed: {}", err), + output_path: None, + re_scan_required: true, + }, + } } }; Some((scan, outcome)) @@ -651,12 +704,12 @@ fn watch_scan(paths: Vec, config: &Config, ruleset: &RuleSet) -> Result eprint!("[SCAN] {}", path.display()); let _ = std::io::stderr().flush(); } - let progress = if show_progress { + let scan_progress = if show_progress { Some(make_progress_callback("SCAN", 1, 1, &path, in_place)) } else { None }; - match scan_file_with_progress(&path, config, ruleset, progress) { + match scan_file_with_progress(&path, config, ruleset, scan_progress) { Ok(scan) => { println!("{}", render_scan_line(&scan)); } @@ -673,27 +726,53 @@ fn watch_fix(paths: Vec, config: &Config, ruleset: &RuleSet, dry_run: b let show_progress = !config.report.json && std::io::stderr().is_terminal(); let in_place = show_progress; if show_progress && !in_place { - eprintln!("[FIX] {}", path.display()); + eprintln!("[SCAN] {}", path.display()); } else if in_place { - eprint!("[FIX] {}", path.display()); + eprint!("[SCAN] {}", path.display()); let _ = std::io::stderr().flush(); } - let progress = if show_progress { - Some(make_progress_callback("FIX", 1, 1, &path, in_place)) + let scan_progress = if show_progress { + Some(make_progress_callback("SCAN", 1, 1, &path, in_place)) } else { None }; - match scan_file_with_progress(&path, config, ruleset, progress) { + match scan_file_with_progress(&path, config, ruleset, scan_progress) { Ok(scan) => { let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy); + if show_progress { + eprintln!("{}", render_scan_result_line(&scan)); + eprintln!("{}", render_plan_line(&plan)); + } let outcome = if dry_run { fix::planner::plan_outcome(plan) } else { - match fix::executor::apply_fix(&path, &plan, config, ruleset) { - Ok(outcome) => outcome, - Err(err) => { - eprintln!("[ERROR] Fix failed {}: {}", path.display(), err); - return; + if plan.actions.is_empty() { + fix::planner::plan_outcome(plan) + } else { + if show_progress && !in_place { + eprintln!("[FIX] {}", path.display()); + } else if in_place { + eprint!("[FIX] {}", path.display()); + let _ = std::io::stderr().flush(); + } + let fix_progress = if show_progress { + Some(make_progress_callback("FIX", 1, 1, &path, in_place)) + } else { + None + }; + match fix::executor::apply_fix_with_progress( + &path, + &plan, + config, + ruleset, + scan.probe.duration, + fix_progress, + ) { + Ok(outcome) => outcome, + Err(err) => { + eprintln!("[ERROR] Fix failed {}: {}", path.display(), err); + return; + } } } }; @@ -810,6 +889,43 @@ fn format_duration(seconds: f64) -> String { } } +fn render_scan_result_line(scan: &ScanOutcome) -> String { + if scan.issues.is_empty() { + format!("[SCAN DONE] OK {}", scan.path.display()) + } else { + let max = scan + .issues + .iter() + .map(|issue| issue.severity) + .max_by_key(|sev| sev.rank()) + .unwrap_or(Severity::Info); + format!( + "[SCAN DONE] {} issues (max {:?}) {}", + scan.issues.len(), + max, + scan.path.display() + ) + } +} + +fn render_plan_line(plan: &FixPlan) -> String { + if plan.actions.is_empty() { + let reason = plan + .blocked_reason + .clone() + .unwrap_or_else(|| "No fix needed".to_string()); + format!("[PLAN] {} (policy: {:?})", reason, plan.policy) + } else { + let actions = plan + .actions + .iter() + .map(|action| format!("{:?}", action.kind)) + .collect::>() + .join(", "); + format!("[PLAN] {} (policy: {:?})", actions, plan.policy) + } +} + fn emit_progress_line(line: &str, done: bool, in_place: bool, state: Arc>) { if !in_place { eprintln!("{}", line);