diff --git a/vid-repair-core/src/scan/decode.rs b/vid-repair-core/src/scan/decode.rs index 3e75800..7f272d1 100644 --- a/vid-repair-core/src/scan/decode.rs +++ b/vid-repair-core/src/scan/decode.rs @@ -1,12 +1,17 @@ use std::io::{BufRead, BufReader}; use std::path::Path; use std::process::{Command, Stdio}; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; +use std::thread; use anyhow::{Context, Result}; use crate::config::ScanDepth; use crate::rules::RuleSet; +use crate::scan::DecodeProgress; #[derive(Debug)] pub struct DecodeOutput { @@ -19,18 +24,30 @@ pub fn run_decode( ffmpeg_path: &str, ruleset: &RuleSet, depth: ScanDepth, + duration: Option, + progress: Option>, ) -> Result { - let mut child = Command::new(ffmpeg_path) + let mut command = Command::new(ffmpeg_path); + + command .arg("-v") .arg("error") .arg("-i") .arg(path) - .args(depth_args(depth)) + .args(depth_args(depth)); + + if progress.is_some() { + command.arg("-nostats").arg("-progress").arg("pipe:1"); + command.stdout(Stdio::piped()); + } else { + command.stdout(Stdio::null()); + } + + let mut child = command .arg("-f") .arg("null") .arg("-") .stderr(Stdio::piped()) - .stdout(Stdio::null()) .spawn() .with_context(|| format!("Failed to run ffmpeg decode for {}", path.display()))?; @@ -39,8 +56,72 @@ pub fn run_decode( let early_stop = Arc::new(AtomicBool::new(false)); let early_stop_flag = early_stop.clone(); + let progress_done = Arc::new(AtomicBool::new(false)); + let progress_done_flag = progress_done.clone(); + + let progress_snapshot = Arc::new(Mutex::new(ProgressSnapshot::default())); + let progress_snapshot_thread = progress_snapshot.clone(); let mut lines = Vec::new(); + let progress_thread = if let (Some(callback), Some(stdout)) = (progress.clone(), child.stdout.take()) { + Some(thread::spawn(move || { + let reader = BufReader::new(stdout); + let mut out_time: Option = None; + let mut speed: Option = None; + + for line in reader.lines() { + let line = match line { + Ok(line) => line, + Err(_) => break, + }; + if line.is_empty() { + continue; + } + + if let Some((key, value)) = line.split_once('=') { + match key { + "out_time_ms" => { + out_time = value.parse::().ok().map(|ms| ms / 1_000_000.0); + } + "out_time_us" => { + out_time = value.parse::().ok().map(|us| us / 1_000_000.0); + } + "out_time" => { + out_time = parse_out_time(value).or(out_time); + } + "speed" => { + speed = parse_speed(value); + } + "progress" => { + let done = value == "end"; + let percent = duration.and_then(|d| out_time.map(|t| (t / d * 100.0).min(100.0))); + + if let Ok(mut snapshot) = progress_snapshot_thread.lock() { + snapshot.out_time = out_time; + snapshot.speed = speed; + } + + callback(DecodeProgress { + out_time, + duration, + percent, + speed, + done, + }); + + if done { + progress_done_flag.store(true, Ordering::SeqCst); + break; + } + } + _ => {} + } + } + } + })) + } else { + None + }; for line in reader.lines() { let line = line.unwrap_or_default(); @@ -58,6 +139,25 @@ pub fn run_decode( } let _ = child.wait(); + if let Some(handle) = progress_thread { + let _ = handle.join(); + } + + if let Some(callback) = progress { + if !progress_done.load(Ordering::SeqCst) { + let snapshot = progress_snapshot.lock().ok(); + let out_time = snapshot.as_ref().and_then(|s| s.out_time); + let speed = snapshot.as_ref().and_then(|s| s.speed); + let percent = duration.map(|_| 100.0); + callback(DecodeProgress { + out_time, + duration, + percent, + speed, + done: true, + }); + } + } Ok(DecodeOutput { lines, @@ -85,3 +185,38 @@ fn should_stop(line: &str, ruleset: &RuleSet) -> bool { } false } + +#[derive(Default)] +struct ProgressSnapshot { + out_time: Option, + speed: Option, +} + +fn parse_speed(value: &str) -> Option { + value.trim().trim_end_matches('x').parse::().ok() +} + +fn parse_out_time(value: &str) -> Option { + let trimmed = value.trim(); + if trimmed.is_empty() { + return None; + } + if let Ok(seconds) = trimmed.parse::() { + return Some(seconds); + } + + let parts: Vec<&str> = trimmed.split(':').collect(); + if parts.len() == 3 { + let hours = parts[0].parse::().ok()?; + let minutes = parts[1].parse::().ok()?; + let seconds = parts[2].parse::().ok()?; + return Some(hours * 3600.0 + minutes * 60.0 + seconds); + } + if parts.len() == 2 { + let minutes = parts[0].parse::().ok()?; + let seconds = parts[1].parse::().ok()?; + return Some(minutes * 60.0 + seconds); + } + + None +} diff --git a/vid-repair-core/src/scan/mod.rs b/vid-repair-core/src/scan/mod.rs index ee9dc10..ae41161 100644 --- a/vid-repair-core/src/scan/mod.rs +++ b/vid-repair-core/src/scan/mod.rs @@ -9,12 +9,29 @@ mod decode; mod ffprobe; mod types; -pub use types::{Issue, ProbeData, ScanOutcome, ScanRequest}; +pub use types::{DecodeProgress, Issue, ProbeData, ScanOutcome, ScanRequest}; pub fn scan_file(path: &Path, config: &Config, ruleset: &RuleSet) -> Result { + scan_file_with_progress(path, config, ruleset, None) +} + +pub fn scan_file_with_progress( + path: &Path, + config: &Config, + ruleset: &RuleSet, + progress: Option>, +) -> Result { let probe = ffprobe::run_ffprobe(path, &config.ffprobe_path)?; - let mut decode = decode::run_decode(path, &config.ffmpeg_path, ruleset, config.scan.depth)?; + let duration = probe.duration; + let mut decode = decode::run_decode( + path, + &config.ffmpeg_path, + ruleset, + config.scan.depth, + duration, + progress.clone(), + )?; let context = build_context(&probe); let mut matches = ruleset.match_lines(&decode.lines, &context); @@ -25,7 +42,14 @@ pub fn scan_file(path: &Path, config: &Config, ruleset: &RuleSet) -> Result, + pub duration: Option, + pub percent: Option, + pub speed: Option, + pub done: bool, +} + #[derive(Debug, Clone)] pub struct ScanRequest { pub path: PathBuf, diff --git a/vid-repair/src/main.rs b/vid-repair/src/main.rs index c0ace87..7256c2a 100644 --- a/vid-repair/src/main.rs +++ b/vid-repair/src/main.rs @@ -1,5 +1,7 @@ -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::io::IsTerminal; +use std::path::{Path, PathBuf}; +use std::sync::{atomic::{AtomicUsize, Ordering}, Arc, Mutex}; +use std::time::{Duration, Instant}; use anyhow::Result; use clap::{Parser, Subcommand, ValueEnum}; @@ -13,7 +15,7 @@ use vid_repair_core::report::{ ScanJsonReport, SCHEMA_VERSION, }; use vid_repair_core::rules::{ensure_ruleset_loaded, RuleSet}; -use vid_repair_core::scan::{scan_file, ScanOutcome}; +use vid_repair_core::scan::{scan_file_with_progress, DecodeProgress, ScanOutcome}; use vid_repair_core::{fs, watch}; #[derive(Parser, Debug)] @@ -431,7 +433,7 @@ fn run_scans(files: Vec, config: &Config, ruleset: &RuleSet) -> Result< let errors = AtomicUsize::new(0); let started = AtomicUsize::new(0); let total = files.len(); - let show_progress = !config.report.json; + let show_progress = !config.report.json && std::io::stderr().is_terminal(); let scans = if let Some(jobs) = jobs { let pool = ThreadPoolBuilder::new().num_threads(jobs).build()?; @@ -440,10 +442,15 @@ fn run_scans(files: Vec, config: &Config, ruleset: &RuleSet) -> Result< .par_iter() .filter_map(|path| { let idx = started.fetch_add(1, Ordering::SeqCst) + 1; + let progress = if show_progress { + Some(make_progress_callback("SCAN", idx, total, path)) + } else { + None + }; if show_progress { eprintln!("[SCAN {}/{}] {}", idx, total, path.display()); } - match scan_file(path, config, ruleset) { + match scan_file_with_progress(path, config, ruleset, progress) { Ok(scan) => Some(scan), Err(err) => { eprintln!("[ERROR] {}: {}", path.display(), err); @@ -459,10 +466,15 @@ fn run_scans(files: Vec, config: &Config, ruleset: &RuleSet) -> Result< .iter() .filter_map(|path| { let idx = started.fetch_add(1, Ordering::SeqCst) + 1; + let progress = if show_progress { + Some(make_progress_callback("SCAN", idx, total, path)) + } else { + None + }; if show_progress { eprintln!("[SCAN {}/{}] {}", idx, total, path.display()); } - match scan_file(path, config, ruleset) { + match scan_file_with_progress(path, config, ruleset, progress) { Ok(scan) => Some(scan), Err(err) => { eprintln!("[ERROR] {}: {}", path.display(), err); @@ -510,7 +522,7 @@ fn process_fix_batch( let mut fixes = Vec::new(); let mut errors = 0usize; let total = files.len(); - let show_progress = !config.report.json; + let show_progress = !config.report.json && std::io::stderr().is_terminal(); let mut idx = 0usize; for path in files { @@ -518,7 +530,12 @@ fn process_fix_batch( if show_progress { eprintln!("[FIX {}/{}] {}", idx, total, path.display()); } - let scan = match scan_file(&path, config, ruleset) { + let progress = if show_progress { + Some(make_progress_callback("FIX", idx, total, &path)) + } else { + None + }; + let scan = match scan_file_with_progress(&path, config, ruleset, progress) { Ok(scan) => scan, Err(err) => { eprintln!("[ERROR] {}: {}", path.display(), err); @@ -558,7 +575,7 @@ fn process_fix_batch_parallel( let errors = AtomicUsize::new(0); let started = AtomicUsize::new(0); let total = files.len(); - let show_progress = !config.report.json; + let show_progress = !config.report.json && std::io::stderr().is_terminal(); let results = files .par_iter() .filter_map(|path| { @@ -566,7 +583,12 @@ fn process_fix_batch_parallel( if show_progress { eprintln!("[FIX {}/{}] {}", idx, total, path.display()); } - let scan = match scan_file(path, config, ruleset) { + let progress = if show_progress { + Some(make_progress_callback("FIX", idx, total, path)) + } else { + None + }; + let scan = match scan_file_with_progress(path, config, ruleset, progress) { Ok(scan) => scan, Err(err) => { eprintln!("[ERROR] {}: {}", path.display(), err); @@ -604,7 +626,12 @@ fn watch_scan(paths: Vec, config: &Config, ruleset: &RuleSet) -> Result if !config.report.json { eprintln!("[SCAN] {}", path.display()); } - match scan_file(&path, config, ruleset) { + let progress = if !config.report.json && std::io::stderr().is_terminal() { + Some(make_progress_callback("SCAN", 1, 1, &path)) + } else { + None + }; + match scan_file_with_progress(&path, config, ruleset, progress) { Ok(scan) => { println!("{}", render_scan_line(&scan)); } @@ -621,7 +648,12 @@ fn watch_fix(paths: Vec, config: &Config, ruleset: &RuleSet, dry_run: b if !config.report.json { eprintln!("[FIX] {}", path.display()); } - match scan_file(&path, config, ruleset) { + let progress = if !config.report.json && std::io::stderr().is_terminal() { + Some(make_progress_callback("FIX", 1, 1, &path)) + } else { + None + }; + match scan_file_with_progress(&path, config, ruleset, progress) { Ok(scan) => { let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy); let outcome = if dry_run { @@ -643,3 +675,89 @@ fn watch_fix(paths: Vec, config: &Config, ruleset: &RuleSet, dry_run: b } }) } + +#[derive(Debug)] +struct ProgressState { + last_emit: Instant, + last_percent: Option, +} + +fn make_progress_callback( + kind: &'static str, + idx: usize, + total: usize, + path: &Path, +) -> Arc { + let prefix = format!("[{} {}/{}]", kind, idx, total); + let path_display = path.display().to_string(); + let now = Instant::now(); + let state = Arc::new(Mutex::new(ProgressState { + last_emit: now.checked_sub(Duration::from_secs(2)).unwrap_or(now), + last_percent: None, + })); + + Arc::new(move |progress: DecodeProgress| { + let now = Instant::now(); + let mut should_emit = progress.done; + let percent = progress + .percent + .map(|value| value.max(0.0).min(100.0).floor() as u8); + + if let Ok(mut state) = state.lock() { + if let Some(pct) = percent { + if state.last_percent.map_or(true, |last| pct >= last.saturating_add(1)) { + state.last_percent = Some(pct); + should_emit = true; + } + } + + if !should_emit && now.duration_since(state.last_emit) < Duration::from_secs(1) { + return; + } + + state.last_emit = now; + } + + let mut line = String::new(); + line.push_str(&prefix); + line.push(' '); + + if let Some(pct) = percent { + line.push_str(&format!("{:>3}% ", pct)); + } + + match (progress.out_time, progress.duration) { + (Some(out_time), Some(duration)) => { + line.push_str(&format!( + "{} / {} ", + format_duration(out_time), + format_duration(duration) + )); + } + (Some(out_time), None) => { + line.push_str(&format!("{} elapsed ", format_duration(out_time))); + } + _ => {} + } + + if let Some(speed) = progress.speed { + line.push_str(&format!("@{:.1}x ", speed)); + } + + line.push_str(&path_display); + eprintln!("{}", line); + }) +} + +fn format_duration(seconds: f64) -> String { + let total = seconds.max(0.0).round() as u64; + let hours = total / 3600; + let minutes = (total % 3600) / 60; + let secs = total % 60; + + if hours > 0 { + format!("{:02}:{:02}:{:02}", hours, minutes, secs) + } else { + format!("{:02}:{:02}", minutes, secs) + } +}