Real-time output monitoring with CommandMonitor and CommandMonitorServer
libffmpeg provides sophisticated output monitoring capabilities through the CommandMonitor, CommandMonitorServer, and CommandMonitorClient types from the libcmd crate. These enable real-time streaming of stdout and stderr, progress tracking, and bidirectional communication.
Receives stdout/stderr messages and can send stdin commands (in graceful mode).From ~/workspace/source/libffmpeg/examples/transcode_with_progress.rs:36:
Accumulates progress data line by line.From ~/workspace/source/libffmpeg/src/ffmpeg/progress.rs:13-27:
/// Accumulator for parsing ffmpeg's `-progress pipe:1` output line by line.////// Feed lines via [`with_line`](Self::with_line), then call [`finish`](Self::finish)/// once a complete progress block has been received to produce a [`Progress`] snapshot.////// ```no_run/// # use libffmpeg::ffmpeg::progress::PartialProgress;/// let mut partial = PartialProgress::default();/// partial.with_line("frame=120");/// partial.with_line("fps=30.00");/// partial.with_line("progress=continue");/// if let Some(progress) = partial.finish() {/// println!("frame={} fps={}", progress.frame, progress.fps);/// }/// ```
A complete progress snapshot.From ~/workspace/source/libffmpeg/src/ffmpeg/progress.rs:152-174:
/// A parsed progress snapshot from ffmpeg's `-progress pipe:1` output.#[derive(Debug, Clone, Serialize, Deserialize)]#[serde(rename_all = "camelCase")]pub struct Progress { /// Number of frames processed so far. pub frame: usize, /// Current encoding speed in frames per second. pub fps: f64, /// Current bitrate in bytes per second. pub bitrate: isize, /// Total output size in bytes. pub total_size: usize, /// Elapsed output time (position in the output stream). pub out_time: Duration, /// Number of duplicated frames. pub dup_frames: usize, /// Number of dropped frames. pub drop_frames: usize, /// Encoding speed as a multiplier of realtime (e.g. 2.0 = 2x realtime). pub speed: f64, /// Whether ffmpeg is still processing or has finished. pub progress: ProgressState,}
From ~/workspace/source/libffmpeg/examples/transcode_with_progress.rs:28-31,54-55:
let video_duration = libffmpeg::util::get_duration(&args.input, root_token.child_token()).await?;let total = args.time.unwrap_or(video_duration.as_secs()) as f64;// Later, in the monitor loop:100f64 * update.out_time.as_secs_f64() / total,
To calculate percentage:
Get the video duration using libffmpeg::util::get_duration()
Too small - The ffmpeg process may block waiting for your code to consume messages
Too large - More memory usage, potential delays in processing
Recommended - 50-200 for most use cases
If ffmpeg produces output very quickly (e.g., with high verbosity), increase the capacity to prevent backpressure. If you’re processing messages slowly (e.g., writing to disk), use a larger buffer.
let mut progress = PartialProgress::default();if progress.with_line(&line) { if let Some(update) = progress.finish() { // Valid progress update println!("Progress: {}%", (update.out_time.as_secs_f64() / total) * 100.0); } else { // Incomplete progress block, keep accumulating }} else { // Not a progress line, handle as regular output println!("Output: {}", line);}
From ~/workspace/source/libffmpeg/src/ffmpeg/progress.rs:112-134:
/// Attempt to produce a complete [`Progress`] snapshot from the accumulated state.////// Returns `None` if no `progress=` line has been received yet, or if the/// bitrate value could not be parsed.#[must_use]pub fn finish(&self) -> Option<Progress> { let progress = match &self.progress { PartialProgressState::Unset => return None, PartialProgressState::Continue => ProgressState::Continue, PartialProgressState::End => ProgressState::End, PartialProgressState::Unknown(v) => ProgressState::Unknown(v.clone()), }; let num_part = self.bitrate.split("kbits").next().unwrap_or("0"); let kbitsf = num_part.parse::<f32>().ok()?; let bitrate = (kbitsf * 1024.0) as isize; Some(Progress { frame: self.frame, fps: self.fps, bitrate, total_size: self.total_size, out_time: Duration::from_micros(self.out_time_us as u64), dup_frames: self.dup_frames, drop_frames: self.drop_frames, speed: self.speed.trim_end_matches('x').parse().unwrap_or_default(), progress, })}
#[tokio::test]async fn test_monitoring() { let token = CancellationToken::new(); let exit_token = token.child_token(); let monitor = CommandMonitor::with_capacity(100); let mut messages = Vec::new(); let monitor_task = { let mut client = monitor.client.clone(); let exit_token = exit_token.clone(); tokio::spawn(async move { let mut msgs = Vec::new(); while let Some(Some(message)) = client.recv().with_cancellation_token(&exit_token).await { msgs.push(message); } msgs }) }; let result = ffmpeg(token, &monitor.server, |cmd| { cmd.arg("-version"); }).await; exit_token.cancel(); let messages = monitor_task.await.unwrap(); assert!(result.is_ok()); assert!(!messages.is_empty()); // Should have received some output}