Frame’s batch processing system allows you to queue multiple conversion tasks and process them concurrently with full control over execution.
Task Queue System
The conversion manager implements a concurrent task queue with configurable concurrency limits:
Architecture (manager.rs:35-41):
pub struct ConversionManager {
pub ( crate ) sender : mpsc :: Sender < ManagerMessage >,
max_concurrency : Arc < AtomicUsize >,
active_tasks : Arc < Mutex < HashMap < String , u32 >>>,
cancelled_tasks : Arc < Mutex < HashSet < String >>>,
}
Components
Task Queue: FIFO queue (VecDeque) holding pending conversions
Active Tasks: HashMap tracking currently running tasks and their PIDs
Cancelled Tasks: HashSet marking tasks for cancellation
Message Channel: Async channel for queue management messages
Concurrency Control
Default Concurrency
Frame processes 2 tasks concurrently by default:
Configuration (types.rs:3):
pub const DEFAULT_MAX_CONCURRENCY : usize = 2 ;
Updating Max Concurrency
manager . update_max_concurrency ( 4 ) ? ; // Process up to 4 tasks simultaneously
Implementation (manager.rs:252-264):
pub fn update_max_concurrency ( & self , value : usize ) -> Result <(), ConversionError > {
if value == 0 {
return Err ( ConversionError :: InvalidInput (
"Max concurrency must be at least 1" . to_string (),
));
}
self . max_concurrency . store ( value , Ordering :: SeqCst );
let tx = self . sender . clone ();
tauri :: async_runtime :: spawn ( async move {
let _ = tx . send ( ManagerMessage :: ConcurrencyUpdated ) . await ;
});
Ok (())
}
Changing concurrency triggers immediate queue processing to start additional tasks if slots are available.
Querying Current Concurrency
let current = manager . current_max_concurrency ();
Implementation (manager.rs:248-250):
pub fn current_max_concurrency ( & self ) -> usize {
self . max_concurrency . load ( Ordering :: SeqCst )
}
Task States
Tasks progress through multiple states:
enum FileStatus {
IDLE = 'IDLE' , // Not queued
QUEUED = 'QUEUED' , // Waiting in queue
CONVERTING = 'CONVERTING' , // Currently processing
PAUSED = 'PAUSED' , // Paused (process suspended)
COMPLETED = 'COMPLETED' , // Successfully completed
ERROR = 'ERROR' // Failed with error
}
Type definition (types.ts:6-13):
export enum FileStatus {
IDLE = 'IDLE' ,
QUEUED = 'QUEUED' ,
CONVERTING = 'CONVERTING' ,
PAUSED = 'PAUSED' ,
COMPLETED = 'COMPLETED' ,
ERROR = 'ERROR'
}
Queue Operations
Enqueueing Tasks
Tasks are added to the queue via message passing:
Implementation (manager.rs:60-82):
ManagerMessage :: Enqueue ( task ) => {
let task = * task ;
{
let mut cancelled = cancelled_tasks_loop . lock () . unwrap ();
cancelled . remove ( & task . id);
}
if running_tasks . contains_key ( & task . id) || queued_ids . contains ( & task . id) {
continue ; // Prevent duplicate tasks
}
queued_ids . insert ( task . id . clone ());
queue . push_back ( task );
ConversionManager :: process_queue (
& app ,
& tx_clone ,
& mut queue ,
& mut queued_ids ,
& mut running_tasks ,
Arc :: clone ( & limiter ),
Arc :: clone ( & cancelled_tasks_loop ),
) . await ;
}
Processing Queue
The queue processor starts tasks up to the concurrency limit:
Implementation (manager.rs:201-246):
async fn process_queue (
app : & AppHandle ,
tx : & mpsc :: Sender < ManagerMessage >,
queue : & mut VecDeque < ConversionTask >,
queued_ids : & mut HashSet < String >,
running_tasks : & mut HashMap < String , ()>,
max_concurrency : Arc < AtomicUsize >,
cancelled_tasks : Arc < Mutex < HashSet < String >>>,
) {
let limit = max_concurrency . load ( Ordering :: SeqCst ) . max ( 1 );
while running_tasks . len () < limit {
if let Some ( task ) = queue . pop_front () {
queued_ids . remove ( & task . id);
let is_cancelled = {
let mut cancelled = cancelled_tasks . lock () . unwrap ();
cancelled . remove ( & task . id)
};
if is_cancelled {
continue ;
}
running_tasks . insert ( task . id . clone (), ());
let app_clone = app . clone ();
let tx_worker = tx . clone ();
let task_clone = task . clone ();
tauri :: async_runtime :: spawn ( async move {
if let Err ( e ) = run_ffmpeg_worker ( app_clone , tx_worker . clone (), task_clone . clone ()) . await {
let _ = tx_worker . send ( ManagerMessage :: TaskError ( task_clone . id, e )) . await ;
} else {
let _ = tx_worker . send ( ManagerMessage :: TaskCompleted ( task_clone . id)) . await ;
}
});
} else {
break ;
}
}
}
Task Control Operations
Pause Task
Suspend a running task by sending SIGSTOP (Unix) or using NtSuspendProcess (Windows):
manager . pause_task ( & task_id ) ? ;
Unix implementation (manager.rs:266-288):
#[cfg(unix)]
pub fn pause_task ( & self , id : & str ) -> Result <(), ConversionError > {
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid == 0 {
return Err ( ConversionError :: TaskNotFound ( id . to_string ()));
}
unsafe {
if libc :: kill ( pid as libc :: pid_t , libc :: SIGSTOP ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGSTOP" . to_string ()));
}
}
Ok (())
} else {
Err ( ConversionError :: TaskNotFound ( id . to_string ()))
}
}
Windows implementation (manager.rs:374-409):
#[cfg(windows)]
unsafe fn windows_suspend_resume ( pid : u32 , suspend : bool ) -> Result <(), ConversionError > {
let process_handle = OpenProcess ( PROCESS_SUSPEND_RESUME , false , pid )
. map_err ( | e | ConversionError :: Shell ( format! ( "Failed to open process: {}" , e ))) ? ;
let ntdll = GetModuleHandleA ( s! ( "ntdll.dll" )) . map_err ( | e | {
let _ = CloseHandle ( process_handle );
ConversionError :: Shell ( format! ( "Failed to get ntdll handle: {}" , e ))
}) ? ;
let fn_name = if suspend {
s! ( "NtSuspendProcess" )
} else {
s! ( "NtResumeProcess" )
};
let func_ptr = GetProcAddress ( ntdll , fn_name );
if let Some ( func ) = func_ptr {
let func : extern "system" fn ( HANDLE ) -> i32 = std :: mem :: transmute ( func );
let status = func ( process_handle );
let _ = CloseHandle ( process_handle );
if status != 0 {
return Err ( ConversionError :: Shell (
format! ( "NtSuspendProcess/NtResumeProcess failed with status: {}" , status )
));
}
Ok (())
} else {
let _ = CloseHandle ( process_handle );
Err ( ConversionError :: Shell (
"Could not find NtSuspendProcess/NtResumeProcess in ntdll" . to_string (),
))
}
}
Resume Task
Resume a paused task by sending SIGCONT (Unix) or using NtResumeProcess (Windows):
manager . resume_task ( & task_id ) ? ;
Unix implementation (manager.rs:290-314):
#[cfg(unix)]
pub fn resume_task ( & self , id : & str ) -> Result <(), ConversionError > {
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid == 0 {
return Err ( ConversionError :: TaskNotFound ( id . to_string ()));
}
unsafe {
if libc :: kill ( pid as libc :: pid_t , libc :: SIGCONT ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGCONT" . to_string ()));
}
}
Ok (())
} else {
Err ( ConversionError :: TaskNotFound ( id . to_string ()))
}
}
Cancel Task
Terminate a running task and clean up temporary files:
manager . cancel_task ( & task_id ) ? ;
Implementation (manager.rs:316-333):
pub fn cancel_task ( & self , id : & str ) -> Result <(), ConversionError > {
{
let mut cancelled = self . cancelled_tasks . lock () . unwrap ();
cancelled . insert ( id . to_string ());
}
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid > 0 {
ConversionManager :: terminate_process ( pid ) ? ;
}
ConversionManager :: cleanup_temp_upscale_dir ( id );
Ok (())
} else {
ConversionManager :: cleanup_temp_upscale_dir ( id );
Ok (())
}
}
Cleanup (manager.rs:335-340):
fn cleanup_temp_upscale_dir ( id : & str ) {
let temp_dir = std :: env :: temp_dir () . join ( format! ( "frame_upscale_{}" , id ));
if temp_dir . exists () {
let _ = std :: fs :: remove_dir_all ( & temp_dir );
}
}
Unix termination (manager.rs:342-351):
#[cfg(unix)]
fn terminate_process ( pid : u32 ) -> Result <(), ConversionError > {
unsafe {
let _ = libc :: kill ( pid as libc :: pid_t , libc :: SIGCONT );
if libc :: kill ( pid as libc :: pid_t , libc :: SIGKILL ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGKILL" . to_string ()));
}
}
Ok (())
}
Windows termination (manager.rs:353-371):
#[cfg(windows)]
fn terminate_process ( pid : u32 ) -> Result <(), ConversionError > {
unsafe {
let _ = windows_suspend_resume ( pid , false );
let process_handle = OpenProcess (
windows :: Win32 :: System :: Threading :: PROCESS_TERMINATE ,
false ,
pid ,
) . map_err ( | e | {
ConversionError :: Shell ( format! ( "Failed to open process for termination: {}" , e ))
}) ? ;
let _ = windows :: Win32 :: System :: Threading :: TerminateProcess ( process_handle , 1 );
let _ = CloseHandle ( process_handle );
}
Ok (())
}
Manager Messages
The queue communicates via async messages:
Message types (manager.rs:27-33):
pub enum ManagerMessage {
Enqueue ( Box < ConversionTask >),
ConcurrencyUpdated ,
TaskStarted ( String , u32 ),
TaskCompleted ( String ),
TaskError ( String , ConversionError ),
}
Message Flow
Enqueue: Add task to queue and trigger processing
TaskStarted: Register PID for process control
TaskCompleted: Remove from active tasks, process next in queue
TaskError: Emit error event, remove from active tasks, process next
ConcurrencyUpdated: Re-process queue with new limit
Task Lifecycle
Progress Tracking
The manager emits events for UI updates:
Event types (types.rs:183-209):
#[derive( Clone , Serialize )]
pub struct ProgressPayload {
pub id : String ,
pub progress : f64 ,
}
#[derive( Clone , Serialize )]
pub struct StartedPayload {
pub id : String ,
}
#[derive( Clone , Serialize )]
pub struct CompletedPayload {
pub id : String ,
pub output_path : String ,
}
#[derive( Clone , Serialize )]
pub struct ErrorPayload {
pub id : String ,
pub error : String ,
}
#[derive( Clone , Serialize )]
pub struct LogPayload {
pub id : String ,
pub line : String ,
}
Event emission (manager.rs:149-166):
ManagerMessage :: TaskError ( id , err ) => {
eprintln! ( "Task {} failed: {}" , id , err );
let _ = app . emit (
"conversion-log" ,
LogPayload {
id : id . clone (),
line : format! ( "[ERROR] {}" , err ),
},
);
let _ = app . emit (
"conversion-error" ,
ErrorPayload {
id : id . clone (),
error : err . to_string (),
},
);
// ...
}
Error Handling
When a task fails, it is removed from active tasks and the queue continues processing:
Error flow (manager.rs:149-188):
ManagerMessage :: TaskError ( id , err ) => {
eprintln! ( "Task {} failed: {}" , id , err );
let _ = app . emit ( "conversion-log" , LogPayload {
id : id . clone (),
line : format! ( "[ERROR] {}" , err ),
});
let _ = app . emit ( "conversion-error" , ErrorPayload {
id : id . clone (),
error : err . to_string (),
});
running_tasks . remove ( & id );
{
let mut cancelled = cancelled_tasks_loop . lock () . unwrap ();
cancelled . remove ( & id );
}
{
let mut tasks = active_tasks_loop . lock () . unwrap ();
tasks . remove ( & id );
}
ConversionManager :: process_queue (
& app ,
& tx_clone ,
& mut queue ,
& mut queued_ids ,
& mut running_tasks ,
Arc :: clone ( & limiter ),
Arc :: clone ( & cancelled_tasks_loop ),
) . await ;
}
Failed tasks do not block the queue. Other tasks continue processing independently.
Thread Safety
The manager uses thread-safe primitives:
Arc AtomicUsize: Atomic concurrency limit shared across threads
Arc Mutex HashMap: Thread-safe active task registry
Arc Mutex HashSet: Thread-safe cancellation tracking
mpsc::Sender: Lock-free async message passing
Performance Considerations
Concurrency Recommendations
Software encoding (libx264, libx265, vp9):
Set concurrency to number of CPU cores / 4
Example: 16-core CPU → max 4 concurrent tasks
Prevents CPU oversubscription and thermal throttling
Hardware encoding (NVENC, VideoToolbox):
Set concurrency to 2-4
GPU encoders have dedicated hardware units
Limited by VRAM and hardware encoder count
Combination of CPU and GPU tasks:
Set concurrency to 3-4
GPU tasks use encoder while CPU tasks use cores
Monitor system resources and adjust
Real-ESRGAN processing:
Set concurrency to 1-2
Very VRAM intensive (4-16GB per task)
Frame manages per-task thread concurrency automatically
Resource Management
Default conservative setting (2 concurrent tasks):
Prevents system overload
Ensures responsive UI
Safe for most hardware configurations
Increasing concurrency:
// For powerful workstations
manager . update_max_concurrency ( 4 ) ? ;
// For servers with multiple GPUs
manager . update_max_concurrency ( 8 ) ? ;
Example: Batch Processing Workflow
import { conversionService } from '$lib/services/conversion' ;
// Prepare files
const files = [
{ id: '1' , path: '/path/to/video1.mp4' , config: preset1Config },
{ id: '2' , path: '/path/to/video2.mkv' , config: preset2Config },
{ id: '3' , path: '/path/to/video3.avi' , config: preset1Config },
];
// Set concurrency
await conversionService . setMaxConcurrency ( 3 );
// Enqueue all tasks
for ( const file of files ) {
await conversionService . convert ( file . id , file . path , file . config );
}
// Listen for progress
conversionService . on ( 'progress' , ( payload ) => {
console . log ( `Task ${ payload . id } : ${ payload . progress } %` );
});
// Listen for completion
conversionService . on ( 'completed' , ( payload ) => {
console . log ( `Task ${ payload . id } completed: ${ payload . output_path } ` );
});
// Listen for errors
conversionService . on ( 'error' , ( payload ) => {
console . error ( `Task ${ payload . id } failed: ${ payload . error } ` );
});
// Pause a specific task
await conversionService . pauseTask ( '1' );
// Resume it later
await conversionService . resumeTask ( '1' );
// Cancel a task
await conversionService . cancelTask ( '2' );
Related Features
Video Conversion Configure individual task settings
Presets Apply consistent settings across batch
AI Upscaling Batch upscale multiple videos