ux: show fix phase progress and scan/plan output
This commit is contained in:
@@ -1,15 +1,31 @@
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Command;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use fs_err as fs;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::fix::{FixKind, FixOutcome, FixPlan};
|
||||
use crate::scan::scan_file;
|
||||
use crate::scan::{scan_file, DecodeProgress};
|
||||
use crate::rules::RuleSet;
|
||||
|
||||
pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet) -> Result<FixOutcome> {
|
||||
apply_fix_with_progress(path, plan, config, ruleset, None, None)
|
||||
}
|
||||
|
||||
pub fn apply_fix_with_progress(
|
||||
path: &Path,
|
||||
plan: &FixPlan,
|
||||
config: &Config,
|
||||
ruleset: &RuleSet,
|
||||
duration: Option<f64>,
|
||||
progress: Option<Arc<dyn Fn(DecodeProgress) + Send + Sync>>,
|
||||
) -> Result<FixOutcome> {
|
||||
if plan.actions.is_empty() {
|
||||
return Ok(FixOutcome {
|
||||
plan: plan.clone(),
|
||||
@@ -27,7 +43,7 @@ pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet
|
||||
let action = &plan.actions[0];
|
||||
let output = prepare_output_path(path, config)?;
|
||||
|
||||
run_ffmpeg_fix(path, &output.temp_path, action.kind, config)?;
|
||||
run_ffmpeg_fix(path, &output.temp_path, action.kind, config, duration, progress)?;
|
||||
|
||||
let verification = scan_file(&output.temp_path, config, ruleset)
|
||||
.with_context(|| format!("Failed to verify output {}", output.temp_path.display()))?;
|
||||
@@ -55,7 +71,14 @@ pub fn apply_fix(path: &Path, plan: &FixPlan, config: &Config, ruleset: &RuleSet
|
||||
})
|
||||
}
|
||||
|
||||
fn run_ffmpeg_fix(path: &Path, output: &Path, kind: FixKind, config: &Config) -> Result<()> {
|
||||
fn run_ffmpeg_fix(
|
||||
path: &Path,
|
||||
output: &Path,
|
||||
kind: FixKind,
|
||||
config: &Config,
|
||||
duration: Option<f64>,
|
||||
progress: Option<Arc<dyn Fn(DecodeProgress) + Send + Sync>>,
|
||||
) -> Result<()> {
|
||||
let mut cmd = Command::new(&config.ffmpeg_path);
|
||||
cmd.arg("-y").arg("-v").arg("error").arg("-i").arg(path);
|
||||
|
||||
@@ -76,6 +99,7 @@ fn run_ffmpeg_fix(path: &Path, output: &Path, kind: FixKind, config: &Config) ->
|
||||
|
||||
cmd.arg(output);
|
||||
|
||||
if progress.is_none() {
|
||||
let output = cmd
|
||||
.output()
|
||||
.with_context(|| format!("Failed to run ffmpeg fix for {}", path.display()))?;
|
||||
@@ -85,6 +109,110 @@ fn run_ffmpeg_fix(path: &Path, output: &Path, kind: FixKind, config: &Config) ->
|
||||
anyhow::bail!("ffmpeg fix failed: {}", stderr.trim());
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
cmd.arg("-nostats").arg("-progress").arg("pipe:2");
|
||||
cmd.stdout(Stdio::null()).stderr(Stdio::piped());
|
||||
|
||||
let mut child = cmd
|
||||
.spawn()
|
||||
.with_context(|| format!("Failed to run ffmpeg fix for {}", path.display()))?;
|
||||
|
||||
let stderr = child
|
||||
.stderr
|
||||
.take()
|
||||
.context("Failed to capture ffmpeg stderr")?;
|
||||
let reader = BufReader::new(stderr);
|
||||
|
||||
let progress_done = Arc::new(AtomicBool::new(false));
|
||||
let progress_done_flag = progress_done.clone();
|
||||
let snapshot = Arc::new(Mutex::new(ProgressSnapshot::default()));
|
||||
let snapshot_thread = snapshot.clone();
|
||||
let progress_cb = progress.clone().unwrap();
|
||||
|
||||
let mut out_time: Option<f64> = None;
|
||||
let mut speed: Option<f64> = None;
|
||||
let mut error_lines = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = match line {
|
||||
Ok(line) => line,
|
||||
Err(_) => break,
|
||||
};
|
||||
if line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some((key, value)) = line.split_once('=') {
|
||||
match key {
|
||||
"out_time_ms" => {
|
||||
out_time = value.parse::<f64>().ok().map(|ms| ms / 1_000_000.0);
|
||||
}
|
||||
"out_time_us" => {
|
||||
out_time = value.parse::<f64>().ok().map(|us| us / 1_000_000.0);
|
||||
}
|
||||
"out_time" => {
|
||||
out_time = parse_out_time(value).or(out_time);
|
||||
}
|
||||
"speed" => {
|
||||
speed = parse_speed(value);
|
||||
}
|
||||
"progress" => {
|
||||
let done = value == "end";
|
||||
let percent = duration.and_then(|d| out_time.map(|t| (t / d * 100.0).min(100.0)));
|
||||
if let Ok(mut snap) = snapshot_thread.lock() {
|
||||
snap.out_time = out_time;
|
||||
snap.speed = speed;
|
||||
}
|
||||
progress_cb(DecodeProgress {
|
||||
out_time,
|
||||
duration,
|
||||
percent,
|
||||
speed,
|
||||
done,
|
||||
});
|
||||
if done {
|
||||
progress_done_flag.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
error_lines.push(line);
|
||||
}
|
||||
}
|
||||
|
||||
let status = child
|
||||
.wait()
|
||||
.with_context(|| format!("Failed to wait for ffmpeg fix for {}", path.display()))?;
|
||||
|
||||
if !progress_done.load(Ordering::SeqCst) {
|
||||
let snapshot = snapshot.lock().ok();
|
||||
let out_time = snapshot.as_ref().and_then(|s| s.out_time);
|
||||
let speed = snapshot.as_ref().and_then(|s| s.speed);
|
||||
let percent = duration.map(|_| 100.0);
|
||||
progress_cb(DecodeProgress {
|
||||
out_time,
|
||||
duration,
|
||||
percent,
|
||||
speed,
|
||||
done: true,
|
||||
});
|
||||
}
|
||||
|
||||
if !status.success() {
|
||||
let message = error_lines.join(" ");
|
||||
anyhow::bail!(
|
||||
"ffmpeg fix failed: {}",
|
||||
if message.trim().is_empty() {
|
||||
"unknown error"
|
||||
} else {
|
||||
message.trim()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -208,3 +336,38 @@ fn next_original_path(path: &Path) -> Result<PathBuf> {
|
||||
|
||||
anyhow::bail!("Unable to find available .original name for {}", path.display());
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ProgressSnapshot {
|
||||
out_time: Option<f64>,
|
||||
speed: Option<f64>,
|
||||
}
|
||||
|
||||
fn parse_speed(value: &str) -> Option<f64> {
|
||||
value.trim().trim_end_matches('x').parse::<f64>().ok()
|
||||
}
|
||||
|
||||
fn parse_out_time(value: &str) -> Option<f64> {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if let Ok(seconds) = trimmed.parse::<f64>() {
|
||||
return Some(seconds);
|
||||
}
|
||||
|
||||
let parts: Vec<&str> = trimmed.split(':').collect();
|
||||
if parts.len() == 3 {
|
||||
let hours = parts[0].parse::<f64>().ok()?;
|
||||
let minutes = parts[1].parse::<f64>().ok()?;
|
||||
let seconds = parts[2].parse::<f64>().ok()?;
|
||||
return Some(hours * 3600.0 + minutes * 60.0 + seconds);
|
||||
}
|
||||
if parts.len() == 2 {
|
||||
let minutes = parts[0].parse::<f64>().ok()?;
|
||||
let seconds = parts[1].parse::<f64>().ok()?;
|
||||
return Some(minutes * 60.0 + seconds);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -9,12 +9,12 @@ use rayon::prelude::*;
|
||||
use rayon::ThreadPoolBuilder;
|
||||
|
||||
use vid_repair_core::config::{Config, ConfigOverrides, FixPolicy, ScanDepth};
|
||||
use vid_repair_core::fix::{self, FixOutcome};
|
||||
use vid_repair_core::fix::{self, FixOutcome, FixPlan};
|
||||
use vid_repair_core::report::{
|
||||
render_fix_line, render_json, render_scan_line, render_summary, FixJsonReport,
|
||||
ScanJsonReport, SCHEMA_VERSION,
|
||||
};
|
||||
use vid_repair_core::rules::{ensure_ruleset_loaded, RuleSet};
|
||||
use vid_repair_core::rules::{ensure_ruleset_loaded, RuleSet, Severity};
|
||||
use vid_repair_core::scan::{scan_file_with_progress, DecodeProgress, ScanOutcome};
|
||||
use vid_repair_core::{fs, watch};
|
||||
|
||||
@@ -542,17 +542,17 @@ fn process_fix_batch(
|
||||
for path in files {
|
||||
idx += 1;
|
||||
if show_progress && !in_place {
|
||||
eprintln!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
eprintln!("[SCAN {}/{}] {}", idx, total, path.display());
|
||||
} else if in_place {
|
||||
eprint!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
eprint!("[SCAN {}/{}] {}", idx, total, path.display());
|
||||
let _ = std::io::stderr().flush();
|
||||
}
|
||||
let progress = if show_progress {
|
||||
Some(make_progress_callback("FIX", idx, total, &path, in_place))
|
||||
let scan_progress = if show_progress {
|
||||
Some(make_progress_callback("SCAN", idx, total, &path, in_place))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let scan = match scan_file_with_progress(&path, config, ruleset, progress) {
|
||||
let scan = match scan_file_with_progress(&path, config, ruleset, scan_progress) {
|
||||
Ok(scan) => scan,
|
||||
Err(err) => {
|
||||
if in_place {
|
||||
@@ -563,11 +563,38 @@ fn process_fix_batch(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if show_progress {
|
||||
eprintln!("{}", render_scan_result_line(&scan));
|
||||
}
|
||||
let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy);
|
||||
if show_progress {
|
||||
eprintln!("{}", render_plan_line(&plan));
|
||||
}
|
||||
let outcome = if dry_run {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
match fix::executor::apply_fix(&path, &plan, config, ruleset) {
|
||||
if plan.actions.is_empty() {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
if show_progress && !in_place {
|
||||
eprintln!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
} else if in_place {
|
||||
eprint!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
let _ = std::io::stderr().flush();
|
||||
}
|
||||
let fix_progress = if show_progress {
|
||||
Some(make_progress_callback("FIX", idx, total, &path, in_place))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match fix::executor::apply_fix_with_progress(
|
||||
&path,
|
||||
&plan,
|
||||
config,
|
||||
ruleset,
|
||||
scan.probe.duration,
|
||||
fix_progress,
|
||||
) {
|
||||
Ok(outcome) => outcome,
|
||||
Err(err) => FixOutcome {
|
||||
plan: plan.clone(),
|
||||
@@ -578,6 +605,7 @@ fn process_fix_batch(
|
||||
re_scan_required: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
scans.push(scan);
|
||||
fixes.push(outcome);
|
||||
@@ -601,14 +629,14 @@ fn process_fix_batch_parallel(
|
||||
.filter_map(|path| {
|
||||
let idx = started.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
if show_progress {
|
||||
eprintln!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
eprintln!("[SCAN {}/{}] {}", idx, total, path.display());
|
||||
}
|
||||
let progress = if show_progress {
|
||||
Some(make_progress_callback("FIX", idx, total, path, false))
|
||||
let scan_progress = if show_progress {
|
||||
Some(make_progress_callback("SCAN", idx, total, path, false))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let scan = match scan_file_with_progress(path, config, ruleset, progress) {
|
||||
let scan = match scan_file_with_progress(path, config, ruleset, scan_progress) {
|
||||
Ok(scan) => scan,
|
||||
Err(err) => {
|
||||
eprintln!("[ERROR] {}: {}", path.display(), err);
|
||||
@@ -616,11 +644,35 @@ fn process_fix_batch_parallel(
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if show_progress {
|
||||
eprintln!("{}", render_scan_result_line(&scan));
|
||||
}
|
||||
let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy);
|
||||
if show_progress {
|
||||
eprintln!("{}", render_plan_line(&plan));
|
||||
}
|
||||
let outcome = if dry_run {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
match fix::executor::apply_fix(path, &plan, config, ruleset) {
|
||||
if plan.actions.is_empty() {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
if show_progress {
|
||||
eprintln!("[FIX {}/{}] {}", idx, total, path.display());
|
||||
}
|
||||
let fix_progress = if show_progress {
|
||||
Some(make_progress_callback("FIX", idx, total, path, false))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match fix::executor::apply_fix_with_progress(
|
||||
path,
|
||||
&plan,
|
||||
config,
|
||||
ruleset,
|
||||
scan.probe.duration,
|
||||
fix_progress,
|
||||
) {
|
||||
Ok(outcome) => outcome,
|
||||
Err(err) => FixOutcome {
|
||||
plan: plan.clone(),
|
||||
@@ -631,6 +683,7 @@ fn process_fix_batch_parallel(
|
||||
re_scan_required: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
Some((scan, outcome))
|
||||
})
|
||||
@@ -651,12 +704,12 @@ fn watch_scan(paths: Vec<PathBuf>, config: &Config, ruleset: &RuleSet) -> Result
|
||||
eprint!("[SCAN] {}", path.display());
|
||||
let _ = std::io::stderr().flush();
|
||||
}
|
||||
let progress = if show_progress {
|
||||
let scan_progress = if show_progress {
|
||||
Some(make_progress_callback("SCAN", 1, 1, &path, in_place))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match scan_file_with_progress(&path, config, ruleset, progress) {
|
||||
match scan_file_with_progress(&path, config, ruleset, scan_progress) {
|
||||
Ok(scan) => {
|
||||
println!("{}", render_scan_line(&scan));
|
||||
}
|
||||
@@ -672,30 +725,56 @@ fn watch_fix(paths: Vec<PathBuf>, config: &Config, ruleset: &RuleSet, dry_run: b
|
||||
watch::watch_paths(&paths, config, |path| {
|
||||
let show_progress = !config.report.json && std::io::stderr().is_terminal();
|
||||
let in_place = show_progress;
|
||||
if show_progress && !in_place {
|
||||
eprintln!("[SCAN] {}", path.display());
|
||||
} else if in_place {
|
||||
eprint!("[SCAN] {}", path.display());
|
||||
let _ = std::io::stderr().flush();
|
||||
}
|
||||
let scan_progress = if show_progress {
|
||||
Some(make_progress_callback("SCAN", 1, 1, &path, in_place))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match scan_file_with_progress(&path, config, ruleset, scan_progress) {
|
||||
Ok(scan) => {
|
||||
let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy);
|
||||
if show_progress {
|
||||
eprintln!("{}", render_scan_result_line(&scan));
|
||||
eprintln!("{}", render_plan_line(&plan));
|
||||
}
|
||||
let outcome = if dry_run {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
if plan.actions.is_empty() {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
if show_progress && !in_place {
|
||||
eprintln!("[FIX] {}", path.display());
|
||||
} else if in_place {
|
||||
eprint!("[FIX] {}", path.display());
|
||||
let _ = std::io::stderr().flush();
|
||||
}
|
||||
let progress = if show_progress {
|
||||
let fix_progress = if show_progress {
|
||||
Some(make_progress_callback("FIX", 1, 1, &path, in_place))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match scan_file_with_progress(&path, config, ruleset, progress) {
|
||||
Ok(scan) => {
|
||||
let plan = fix::planner::plan_fix(&scan.issues, config.repair.policy);
|
||||
let outcome = if dry_run {
|
||||
fix::planner::plan_outcome(plan)
|
||||
} else {
|
||||
match fix::executor::apply_fix(&path, &plan, config, ruleset) {
|
||||
match fix::executor::apply_fix_with_progress(
|
||||
&path,
|
||||
&plan,
|
||||
config,
|
||||
ruleset,
|
||||
scan.probe.duration,
|
||||
fix_progress,
|
||||
) {
|
||||
Ok(outcome) => outcome,
|
||||
Err(err) => {
|
||||
eprintln!("[ERROR] Fix failed {}: {}", path.display(), err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
println!("{}", render_fix_line(&scan, &outcome));
|
||||
}
|
||||
@@ -810,6 +889,43 @@ fn format_duration(seconds: f64) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
fn render_scan_result_line(scan: &ScanOutcome) -> String {
|
||||
if scan.issues.is_empty() {
|
||||
format!("[SCAN DONE] OK {}", scan.path.display())
|
||||
} else {
|
||||
let max = scan
|
||||
.issues
|
||||
.iter()
|
||||
.map(|issue| issue.severity)
|
||||
.max_by_key(|sev| sev.rank())
|
||||
.unwrap_or(Severity::Info);
|
||||
format!(
|
||||
"[SCAN DONE] {} issues (max {:?}) {}",
|
||||
scan.issues.len(),
|
||||
max,
|
||||
scan.path.display()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn render_plan_line(plan: &FixPlan) -> String {
|
||||
if plan.actions.is_empty() {
|
||||
let reason = plan
|
||||
.blocked_reason
|
||||
.clone()
|
||||
.unwrap_or_else(|| "No fix needed".to_string());
|
||||
format!("[PLAN] {} (policy: {:?})", reason, plan.policy)
|
||||
} else {
|
||||
let actions = plan
|
||||
.actions
|
||||
.iter()
|
||||
.map(|action| format!("{:?}", action.kind))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
format!("[PLAN] {} (policy: {:?})", actions, plan.policy)
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_progress_line(line: &str, done: bool, in_place: bool, state: Arc<Mutex<ProgressState>>) {
|
||||
if !in_place {
|
||||
eprintln!("{}", line);
|
||||
|
||||
Reference in New Issue
Block a user