Skip to main content
Frame’s batch processing system allows you to queue multiple conversion tasks and process them concurrently with full control over execution.

Task Queue System

The conversion manager implements a concurrent task queue with configurable concurrency limits: Architecture (manager.rs:35-41):
pub struct ConversionManager {
    pub(crate) sender: mpsc::Sender<ManagerMessage>,
    max_concurrency: Arc<AtomicUsize>,
    active_tasks: Arc<Mutex<HashMap<String, u32>>>,
    cancelled_tasks: Arc<Mutex<HashSet<String>>>,
}

Components

  • Task Queue: FIFO queue (VecDeque) holding pending conversions
  • Active Tasks: HashMap tracking currently running tasks and their PIDs
  • Cancelled Tasks: HashSet marking tasks for cancellation
  • Message Channel: Async channel for queue management messages

Concurrency Control

Default Concurrency

Frame processes 2 tasks concurrently by default: Configuration (types.rs:3):
pub const DEFAULT_MAX_CONCURRENCY: usize = 2;

Updating Max Concurrency

manager.update_max_concurrency(4)?; // Process up to 4 tasks simultaneously
Implementation (manager.rs:252-264):
pub fn update_max_concurrency(&self, value: usize) -> Result<(), ConversionError> {
    if value == 0 {
        return Err(ConversionError::InvalidInput(
            "Max concurrency must be at least 1".to_string(),
        ));
    }
    self.max_concurrency.store(value, Ordering::SeqCst);
    let tx = self.sender.clone();
    tauri::async_runtime::spawn(async move {
        let _ = tx.send(ManagerMessage::ConcurrencyUpdated).await;
    });
    Ok(())
}
Changing concurrency triggers immediate queue processing to start additional tasks if slots are available.

Querying Current Concurrency

let current = manager.current_max_concurrency();
Implementation (manager.rs:248-250):
pub fn current_max_concurrency(&self) -> usize {
    self.max_concurrency.load(Ordering::SeqCst)
}

Task States

Tasks progress through multiple states:
enum FileStatus {
  IDLE = 'IDLE',           // Not queued
  QUEUED = 'QUEUED',       // Waiting in queue
  CONVERTING = 'CONVERTING', // Currently processing
  PAUSED = 'PAUSED',       // Paused (process suspended)
  COMPLETED = 'COMPLETED', // Successfully completed
  ERROR = 'ERROR'          // Failed with error
}
Type definition (types.ts:6-13):
export enum FileStatus {
  IDLE = 'IDLE',
  QUEUED = 'QUEUED',
  CONVERTING = 'CONVERTING',
  PAUSED = 'PAUSED',
  COMPLETED = 'COMPLETED',
  ERROR = 'ERROR'
}

Queue Operations

Enqueueing Tasks

Tasks are added to the queue via message passing: Implementation (manager.rs:60-82):
ManagerMessage::Enqueue(task) => {
    let task = *task;
    {
        let mut cancelled = cancelled_tasks_loop.lock().unwrap();
        cancelled.remove(&task.id);
    }

    if running_tasks.contains_key(&task.id) || queued_ids.contains(&task.id) {
        continue; // Prevent duplicate tasks
    }

    queued_ids.insert(task.id.clone());
    queue.push_back(task);
    ConversionManager::process_queue(
        &app,
        &tx_clone,
        &mut queue,
        &mut queued_ids,
        &mut running_tasks,
        Arc::clone(&limiter),
        Arc::clone(&cancelled_tasks_loop),
    ).await;
}

Processing Queue

The queue processor starts tasks up to the concurrency limit: Implementation (manager.rs:201-246):
async fn process_queue(
    app: &AppHandle,
    tx: &mpsc::Sender<ManagerMessage>,
    queue: &mut VecDeque<ConversionTask>,
    queued_ids: &mut HashSet<String>,
    running_tasks: &mut HashMap<String, ()>,
    max_concurrency: Arc<AtomicUsize>,
    cancelled_tasks: Arc<Mutex<HashSet<String>>>,
) {
    let limit = max_concurrency.load(Ordering::SeqCst).max(1);

    while running_tasks.len() < limit {
        if let Some(task) = queue.pop_front() {
            queued_ids.remove(&task.id);
            let is_cancelled = {
                let mut cancelled = cancelled_tasks.lock().unwrap();
                cancelled.remove(&task.id)
            };
            if is_cancelled {
                continue;
            }

            running_tasks.insert(task.id.clone(), ());

            let app_clone = app.clone();
            let tx_worker = tx.clone();
            let task_clone = task.clone();

            tauri::async_runtime::spawn(async move {
                if let Err(e) = run_ffmpeg_worker(app_clone, tx_worker.clone(), task_clone.clone()).await {
                    let _ = tx_worker.send(ManagerMessage::TaskError(task_clone.id, e)).await;
                } else {
                    let _ = tx_worker.send(ManagerMessage::TaskCompleted(task_clone.id)).await;
                }
            });
        } else {
            break;
        }
    }
}

Task Control Operations

Pause Task

Suspend a running task by sending SIGSTOP (Unix) or using NtSuspendProcess (Windows):
manager.pause_task(&task_id)?;
Unix implementation (manager.rs:266-288):
#[cfg(unix)]
pub fn pause_task(&self, id: &str) -> Result<(), ConversionError> {
    let tasks = self.active_tasks.lock().unwrap();
    if let Some(&pid) = tasks.get(id) {
        if pid == 0 {
            return Err(ConversionError::TaskNotFound(id.to_string()));
        }

        unsafe {
            if libc::kill(pid as libc::pid_t, libc::SIGSTOP) != 0 {
                return Err(ConversionError::Shell("Failed to send SIGSTOP".to_string()));
            }
        }
        Ok(())
    } else {
        Err(ConversionError::TaskNotFound(id.to_string()))
    }
}
Windows implementation (manager.rs:374-409):
#[cfg(windows)]
unsafe fn windows_suspend_resume(pid: u32, suspend: bool) -> Result<(), ConversionError> {
    let process_handle = OpenProcess(PROCESS_SUSPEND_RESUME, false, pid)
        .map_err(|e| ConversionError::Shell(format!("Failed to open process: {}", e)))?;

    let ntdll = GetModuleHandleA(s!("ntdll.dll")).map_err(|e| {
        let _ = CloseHandle(process_handle);
        ConversionError::Shell(format!("Failed to get ntdll handle: {}", e))
    })?;

    let fn_name = if suspend {
        s!("NtSuspendProcess")
    } else {
        s!("NtResumeProcess")
    };

    let func_ptr = GetProcAddress(ntdll, fn_name);

    if let Some(func) = func_ptr {
        let func: extern "system" fn(HANDLE) -> i32 = std::mem::transmute(func);
        let status = func(process_handle);
        let _ = CloseHandle(process_handle);

        if status != 0 {
            return Err(ConversionError::Shell(
                format!("NtSuspendProcess/NtResumeProcess failed with status: {}", status)
            ));
        }
        Ok(())
    } else {
        let _ = CloseHandle(process_handle);
        Err(ConversionError::Shell(
            "Could not find NtSuspendProcess/NtResumeProcess in ntdll".to_string(),
        ))
    }
}

Resume Task

Resume a paused task by sending SIGCONT (Unix) or using NtResumeProcess (Windows):
manager.resume_task(&task_id)?;
Unix implementation (manager.rs:290-314):
#[cfg(unix)]
pub fn resume_task(&self, id: &str) -> Result<(), ConversionError> {
    let tasks = self.active_tasks.lock().unwrap();
    if let Some(&pid) = tasks.get(id) {
        if pid == 0 {
            return Err(ConversionError::TaskNotFound(id.to_string()));
        }

        unsafe {
            if libc::kill(pid as libc::pid_t, libc::SIGCONT) != 0 {
                return Err(ConversionError::Shell("Failed to send SIGCONT".to_string()));
            }
        }
        Ok(())
    } else {
        Err(ConversionError::TaskNotFound(id.to_string()))
    }
}

Cancel Task

Terminate a running task and clean up temporary files:
manager.cancel_task(&task_id)?;
Implementation (manager.rs:316-333):
pub fn cancel_task(&self, id: &str) -> Result<(), ConversionError> {
    {
        let mut cancelled = self.cancelled_tasks.lock().unwrap();
        cancelled.insert(id.to_string());
    }

    let tasks = self.active_tasks.lock().unwrap();
    if let Some(&pid) = tasks.get(id) {
        if pid > 0 {
            ConversionManager::terminate_process(pid)?;
        }
        ConversionManager::cleanup_temp_upscale_dir(id);
        Ok(())
    } else {
        ConversionManager::cleanup_temp_upscale_dir(id);
        Ok(())
    }
}
Cleanup (manager.rs:335-340):
fn cleanup_temp_upscale_dir(id: &str) {
    let temp_dir = std::env::temp_dir().join(format!("frame_upscale_{}", id));
    if temp_dir.exists() {
        let _ = std::fs::remove_dir_all(&temp_dir);
    }
}
Unix termination (manager.rs:342-351):
#[cfg(unix)]
fn terminate_process(pid: u32) -> Result<(), ConversionError> {
    unsafe {
        let _ = libc::kill(pid as libc::pid_t, libc::SIGCONT);
        if libc::kill(pid as libc::pid_t, libc::SIGKILL) != 0 {
            return Err(ConversionError::Shell("Failed to send SIGKILL".to_string()));
        }
    }
    Ok(())
}
Windows termination (manager.rs:353-371):
#[cfg(windows)]
fn terminate_process(pid: u32) -> Result<(), ConversionError> {
    unsafe {
        let _ = windows_suspend_resume(pid, false);

        let process_handle = OpenProcess(
            windows::Win32::System::Threading::PROCESS_TERMINATE,
            false,
            pid,
        ).map_err(|e| {
            ConversionError::Shell(format!("Failed to open process for termination: {}", e))
        })?;

        let _ = windows::Win32::System::Threading::TerminateProcess(process_handle, 1);
        let _ = CloseHandle(process_handle);
    }
    Ok(())
}

Manager Messages

The queue communicates via async messages: Message types (manager.rs:27-33):
pub enum ManagerMessage {
    Enqueue(Box<ConversionTask>),
    ConcurrencyUpdated,
    TaskStarted(String, u32),
    TaskCompleted(String),
    TaskError(String, ConversionError),
}

Message Flow

  1. Enqueue: Add task to queue and trigger processing
  2. TaskStarted: Register PID for process control
  3. TaskCompleted: Remove from active tasks, process next in queue
  4. TaskError: Emit error event, remove from active tasks, process next
  5. ConcurrencyUpdated: Re-process queue with new limit

Task Lifecycle

Progress Tracking

The manager emits events for UI updates: Event types (types.rs:183-209):
#[derive(Clone, Serialize)]
pub struct ProgressPayload {
    pub id: String,
    pub progress: f64,
}

#[derive(Clone, Serialize)]
pub struct StartedPayload {
    pub id: String,
}

#[derive(Clone, Serialize)]
pub struct CompletedPayload {
    pub id: String,
    pub output_path: String,
}

#[derive(Clone, Serialize)]
pub struct ErrorPayload {
    pub id: String,
    pub error: String,
}

#[derive(Clone, Serialize)]
pub struct LogPayload {
    pub id: String,
    pub line: String,
}
Event emission (manager.rs:149-166):
ManagerMessage::TaskError(id, err) => {
    eprintln!("Task {} failed: {}", id, err);

    let _ = app.emit(
        "conversion-log",
        LogPayload {
            id: id.clone(),
            line: format!("[ERROR] {}", err),
        },
    );

    let _ = app.emit(
        "conversion-error",
        ErrorPayload {
            id: id.clone(),
            error: err.to_string(),
        },
    );
    // ...
}

Error Handling

When a task fails, it is removed from active tasks and the queue continues processing: Error flow (manager.rs:149-188):
ManagerMessage::TaskError(id, err) => {
    eprintln!("Task {} failed: {}", id, err);

    let _ = app.emit("conversion-log", LogPayload {
        id: id.clone(),
        line: format!("[ERROR] {}", err),
    });

    let _ = app.emit("conversion-error", ErrorPayload {
        id: id.clone(),
        error: err.to_string(),
    });

    running_tasks.remove(&id);
    {
        let mut cancelled = cancelled_tasks_loop.lock().unwrap();
        cancelled.remove(&id);
    }
    {
        let mut tasks = active_tasks_loop.lock().unwrap();
        tasks.remove(&id);
    }

    ConversionManager::process_queue(
        &app,
        &tx_clone,
        &mut queue,
        &mut queued_ids,
        &mut running_tasks,
        Arc::clone(&limiter),
        Arc::clone(&cancelled_tasks_loop),
    ).await;
}
Failed tasks do not block the queue. Other tasks continue processing independently.

Thread Safety

The manager uses thread-safe primitives:
  • Arc AtomicUsize: Atomic concurrency limit shared across threads
  • Arc Mutex HashMap: Thread-safe active task registry
  • Arc Mutex HashSet: Thread-safe cancellation tracking
  • mpsc::Sender: Lock-free async message passing

Performance Considerations

Concurrency Recommendations

Software encoding (libx264, libx265, vp9):
  • Set concurrency to number of CPU cores / 4
  • Example: 16-core CPU → max 4 concurrent tasks
  • Prevents CPU oversubscription and thermal throttling
Hardware encoding (NVENC, VideoToolbox):
  • Set concurrency to 2-4
  • GPU encoders have dedicated hardware units
  • Limited by VRAM and hardware encoder count
Combination of CPU and GPU tasks:
  • Set concurrency to 3-4
  • GPU tasks use encoder while CPU tasks use cores
  • Monitor system resources and adjust
Real-ESRGAN processing:
  • Set concurrency to 1-2
  • Very VRAM intensive (4-16GB per task)
  • Frame manages per-task thread concurrency automatically

Resource Management

Default conservative setting (2 concurrent tasks):
  • Prevents system overload
  • Ensures responsive UI
  • Safe for most hardware configurations
Increasing concurrency:
// For powerful workstations
manager.update_max_concurrency(4)?;

// For servers with multiple GPUs
manager.update_max_concurrency(8)?;

Example: Batch Processing Workflow

import { conversionService } from '$lib/services/conversion';

// Prepare files
const files = [
  { id: '1', path: '/path/to/video1.mp4', config: preset1Config },
  { id: '2', path: '/path/to/video2.mkv', config: preset2Config },
  { id: '3', path: '/path/to/video3.avi', config: preset1Config },
];

// Set concurrency
await conversionService.setMaxConcurrency(3);

// Enqueue all tasks
for (const file of files) {
  await conversionService.convert(file.id, file.path, file.config);
}

// Listen for progress
conversionService.on('progress', (payload) => {
  console.log(`Task ${payload.id}: ${payload.progress}%`);
});

// Listen for completion
conversionService.on('completed', (payload) => {
  console.log(`Task ${payload.id} completed: ${payload.output_path}`);
});

// Listen for errors
conversionService.on('error', (payload) => {
  console.error(`Task ${payload.id} failed: ${payload.error}`);
});

// Pause a specific task
await conversionService.pauseTask('1');

// Resume it later
await conversionService.resumeTask('1');

// Cancel a task
await conversionService.cancelTask('2');

Video Conversion

Configure individual task settings

Presets

Apply consistent settings across batch

AI Upscaling

Batch upscale multiple videos

Build docs developers (and LLMs) love