ux: show per-file scan progress via ffmpeg
This commit is contained in:
@@ -1,12 +1,17 @@
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
||||
use std::thread;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
use crate::config::ScanDepth;
|
||||
use crate::rules::RuleSet;
|
||||
use crate::scan::DecodeProgress;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DecodeOutput {
|
||||
@@ -19,18 +24,30 @@ pub fn run_decode(
|
||||
ffmpeg_path: &str,
|
||||
ruleset: &RuleSet,
|
||||
depth: ScanDepth,
|
||||
duration: Option<f64>,
|
||||
progress: Option<Arc<dyn Fn(DecodeProgress) + Send + Sync>>,
|
||||
) -> Result<DecodeOutput> {
|
||||
let mut child = Command::new(ffmpeg_path)
|
||||
let mut command = Command::new(ffmpeg_path);
|
||||
|
||||
command
|
||||
.arg("-v")
|
||||
.arg("error")
|
||||
.arg("-i")
|
||||
.arg(path)
|
||||
.args(depth_args(depth))
|
||||
.args(depth_args(depth));
|
||||
|
||||
if progress.is_some() {
|
||||
command.arg("-nostats").arg("-progress").arg("pipe:1");
|
||||
command.stdout(Stdio::piped());
|
||||
} else {
|
||||
command.stdout(Stdio::null());
|
||||
}
|
||||
|
||||
let mut child = command
|
||||
.arg("-f")
|
||||
.arg("null")
|
||||
.arg("-")
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::null())
|
||||
.spawn()
|
||||
.with_context(|| format!("Failed to run ffmpeg decode for {}", path.display()))?;
|
||||
|
||||
@@ -39,8 +56,72 @@ pub fn run_decode(
|
||||
|
||||
let early_stop = Arc::new(AtomicBool::new(false));
|
||||
let early_stop_flag = early_stop.clone();
|
||||
let progress_done = Arc::new(AtomicBool::new(false));
|
||||
let progress_done_flag = progress_done.clone();
|
||||
|
||||
let progress_snapshot = Arc::new(Mutex::new(ProgressSnapshot::default()));
|
||||
let progress_snapshot_thread = progress_snapshot.clone();
|
||||
|
||||
let mut lines = Vec::new();
|
||||
let progress_thread = if let (Some(callback), Some(stdout)) = (progress.clone(), child.stdout.take()) {
|
||||
Some(thread::spawn(move || {
|
||||
let reader = BufReader::new(stdout);
|
||||
let mut out_time: Option<f64> = None;
|
||||
let mut speed: Option<f64> = None;
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = match line {
|
||||
Ok(line) => line,
|
||||
Err(_) => break,
|
||||
};
|
||||
if line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some((key, value)) = line.split_once('=') {
|
||||
match key {
|
||||
"out_time_ms" => {
|
||||
out_time = value.parse::<f64>().ok().map(|ms| ms / 1_000_000.0);
|
||||
}
|
||||
"out_time_us" => {
|
||||
out_time = value.parse::<f64>().ok().map(|us| us / 1_000_000.0);
|
||||
}
|
||||
"out_time" => {
|
||||
out_time = parse_out_time(value).or(out_time);
|
||||
}
|
||||
"speed" => {
|
||||
speed = parse_speed(value);
|
||||
}
|
||||
"progress" => {
|
||||
let done = value == "end";
|
||||
let percent = duration.and_then(|d| out_time.map(|t| (t / d * 100.0).min(100.0)));
|
||||
|
||||
if let Ok(mut snapshot) = progress_snapshot_thread.lock() {
|
||||
snapshot.out_time = out_time;
|
||||
snapshot.speed = speed;
|
||||
}
|
||||
|
||||
callback(DecodeProgress {
|
||||
out_time,
|
||||
duration,
|
||||
percent,
|
||||
speed,
|
||||
done,
|
||||
});
|
||||
|
||||
if done {
|
||||
progress_done_flag.store(true, Ordering::SeqCst);
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line.unwrap_or_default();
|
||||
@@ -58,6 +139,25 @@ pub fn run_decode(
|
||||
}
|
||||
|
||||
let _ = child.wait();
|
||||
if let Some(handle) = progress_thread {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
if let Some(callback) = progress {
|
||||
if !progress_done.load(Ordering::SeqCst) {
|
||||
let snapshot = progress_snapshot.lock().ok();
|
||||
let out_time = snapshot.as_ref().and_then(|s| s.out_time);
|
||||
let speed = snapshot.as_ref().and_then(|s| s.speed);
|
||||
let percent = duration.map(|_| 100.0);
|
||||
callback(DecodeProgress {
|
||||
out_time,
|
||||
duration,
|
||||
percent,
|
||||
speed,
|
||||
done: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DecodeOutput {
|
||||
lines,
|
||||
@@ -85,3 +185,38 @@ fn should_stop(line: &str, ruleset: &RuleSet) -> bool {
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ProgressSnapshot {
|
||||
out_time: Option<f64>,
|
||||
speed: Option<f64>,
|
||||
}
|
||||
|
||||
fn parse_speed(value: &str) -> Option<f64> {
|
||||
value.trim().trim_end_matches('x').parse::<f64>().ok()
|
||||
}
|
||||
|
||||
fn parse_out_time(value: &str) -> Option<f64> {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if let Ok(seconds) = trimmed.parse::<f64>() {
|
||||
return Some(seconds);
|
||||
}
|
||||
|
||||
let parts: Vec<&str> = trimmed.split(':').collect();
|
||||
if parts.len() == 3 {
|
||||
let hours = parts[0].parse::<f64>().ok()?;
|
||||
let minutes = parts[1].parse::<f64>().ok()?;
|
||||
let seconds = parts[2].parse::<f64>().ok()?;
|
||||
return Some(hours * 3600.0 + minutes * 60.0 + seconds);
|
||||
}
|
||||
if parts.len() == 2 {
|
||||
let minutes = parts[0].parse::<f64>().ok()?;
|
||||
let seconds = parts[1].parse::<f64>().ok()?;
|
||||
return Some(minutes * 60.0 + seconds);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -9,12 +9,29 @@ mod decode;
|
||||
mod ffprobe;
|
||||
mod types;
|
||||
|
||||
pub use types::{Issue, ProbeData, ScanOutcome, ScanRequest};
|
||||
pub use types::{DecodeProgress, Issue, ProbeData, ScanOutcome, ScanRequest};
|
||||
|
||||
pub fn scan_file(path: &Path, config: &Config, ruleset: &RuleSet) -> Result<ScanOutcome> {
|
||||
scan_file_with_progress(path, config, ruleset, None)
|
||||
}
|
||||
|
||||
pub fn scan_file_with_progress(
|
||||
path: &Path,
|
||||
config: &Config,
|
||||
ruleset: &RuleSet,
|
||||
progress: Option<std::sync::Arc<dyn Fn(DecodeProgress) + Send + Sync>>,
|
||||
) -> Result<ScanOutcome> {
|
||||
let probe = ffprobe::run_ffprobe(path, &config.ffprobe_path)?;
|
||||
|
||||
let mut decode = decode::run_decode(path, &config.ffmpeg_path, ruleset, config.scan.depth)?;
|
||||
let duration = probe.duration;
|
||||
let mut decode = decode::run_decode(
|
||||
path,
|
||||
&config.ffmpeg_path,
|
||||
ruleset,
|
||||
config.scan.depth,
|
||||
duration,
|
||||
progress.clone(),
|
||||
)?;
|
||||
|
||||
let context = build_context(&probe);
|
||||
let mut matches = ruleset.match_lines(&decode.lines, &context);
|
||||
@@ -25,7 +42,14 @@ pub fn scan_file(path: &Path, config: &Config, ruleset: &RuleSet) -> Result<Scan
|
||||
&& !decode.early_stop
|
||||
&& (had_decode_errors || !matches.is_empty())
|
||||
{
|
||||
decode = decode::run_decode(path, &config.ffmpeg_path, ruleset, ScanDepth::Deep)?;
|
||||
decode = decode::run_decode(
|
||||
path,
|
||||
&config.ffmpeg_path,
|
||||
ruleset,
|
||||
ScanDepth::Deep,
|
||||
duration,
|
||||
progress,
|
||||
)?;
|
||||
matches = ruleset.match_lines(&decode.lines, &context);
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,15 @@ pub struct ScanOutcome {
|
||||
pub early_stop: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DecodeProgress {
|
||||
pub out_time: Option<f64>,
|
||||
pub duration: Option<f64>,
|
||||
pub percent: Option<f64>,
|
||||
pub speed: Option<f64>,
|
||||
pub done: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ScanRequest {
|
||||
pub path: PathBuf,
|
||||
|
||||
Reference in New Issue
Block a user