Job Queue
MoneyPrinter uses a database-backed job queue for reliable, restart-safe video generation processing.
Why a Database Queue?
Traditional in-memory queues lose state on restart. MoneyPrinter’s database queue provides:
Durability Jobs persist across API/worker restarts
Observability Query job status and history with SQL
Concurrent workers Multiple workers can process jobs simultaneously
Cancellation Jobs can be cancelled mid-execution
Job Lifecycle
Jobs transition through five states:
State Transitions
From To Trigger Description - queuedAPI call Job created, waiting for worker queuedrunningWorker claims Worker starts processing queuedcancelledCancel request Cancelled before worker claimed runningcompletedPipeline success Video generated successfully runningfailedPipeline error Unrecoverable error occurred runningcancelledCancel request User cancelled during execution
Creating Jobs
Jobs are created via the API /api/generate endpoint:
@app.route ( "/api/generate" , methods = [ "POST" ])
def generate ():
data = request.get_json() or {}
if not data.get( "videoSubject" ):
return jsonify({ "status" : "error" , "message" : "videoSubject is required." }), 400
with SessionLocal() as session:
job = create_job(session, payload = data)
return jsonify(
{
"status" : "success" ,
"message" : "Video generation queued." ,
"jobId" : job.id,
}
)
The create_job function:
def create_job ( session : Session, payload : dict , max_attempts : int = 1 ) -> GenerationJob:
job = GenerationJob(
id = str (uuid4()),
status = "queued" ,
payload = payload,
max_attempts = max_attempts,
cancel_requested = False ,
)
session.add(job)
session.flush()
append_event(session, job.id, "queued" , "info" , "Job queued." )
session.commit()
session.refresh(job)
return job
Job Payload
The job payload contains all parameters for video generation:
{
"videoSubject" : "Top 5 AI Tools" ,
"aiModel" : "llama3.1:8b" ,
"voice" : "en_us_001" ,
"paragraphNumber" : 1 ,
"customPrompt" : "" ,
"subtitlesPosition" : "center" ,
"color" : "#FFFF00" ,
"useMusic" : true ,
"automateYoutubeUpload" : false ,
"threads" : 2
}
Claiming Jobs (Worker)
Workers poll the database for queued jobs:
def process_next_job () -> bool :
with SessionLocal() as session:
job = claim_next_queued_job(session)
if not job:
return False
# Process job...
return True
def main () -> None :
load_dotenv( ENV_FILE )
check_env_vars()
init_db()
while True :
processed = process_next_job()
if not processed:
time.sleep( POLL_SECONDS ) # 1.0 second
Claiming Logic
The claim_next_queued_job function uses database-level locking:
def claim_next_queued_job ( session : Session) -> Optional[GenerationJob]:
dialect = session.bind.dialect.name if session.bind else ""
if dialect == "postgresql" :
# Postgres: use SKIP LOCKED for concurrent workers
row = session.execute(
text(
"""
SELECT id
FROM generation_jobs
WHERE status = 'queued' AND cancel_requested = false
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
"""
)
).first()
if not row:
return None
job = get_job(session, row[ 0 ])
else :
# SQLite fallback (no SKIP LOCKED)
stmt = (
select(GenerationJob)
.where(
and_(
GenerationJob.status == "queued" ,
GenerationJob.cancel_requested.is_( False ),
)
)
.order_by(GenerationJob.created_at.asc())
.limit( 1 )
)
job = session.scalars(stmt).first()
if not job:
return None
# Mark as running
job.status = "running"
job.attempt_count = (job.attempt_count or 0 ) + 1
job.started_at = utcnow()
job.updated_at = utcnow()
append_event(session, job.id, "running" , "info" , "Job started." )
session.commit()
session.refresh(job)
return job
Postgres : FOR UPDATE SKIP LOCKED allows multiple workers to claim different jobs concurrently without blocking.SQLite : Single-worker setup recommended (no row-level locking).
Event Logging
Events provide real-time progress updates:
def append_event (
session : Session,
job_id : str ,
event_type : str ,
level : str ,
message : str ,
payload : Optional[ dict ] = None ,
) -> GenerationEvent:
event = GenerationEvent(
job_id = job_id,
event_type = event_type,
level = level,
message = message,
payload = payload,
)
session.add(event)
session.flush()
return event
Event Types
Type Level Description queuedinfo Job created runninginfo Worker started processing loginfo/warning/error Pipeline progress message cancel_requestedwarning User requested cancellation cancelledwarning Job cancelled completesuccess Video generated errorerror Pipeline failed
Frontend Polling
The frontend polls for new events:
async function pollJobEvents ( jobId , afterEventId ) {
const response = await apiRequest ( `/api/jobs/ ${ jobId } /events?after= ${ afterEventId } ` );
if ( response . status === 'success' ) {
response . events . forEach ( event => {
appendLog ( event . message , event . level );
});
if ( response . events . length > 0 ) {
const lastEvent = response . events [ response . events . length - 1 ];
return lastEvent . id ; // Next poll starts after this ID
}
}
return afterEventId ;
}
Job Completion
Jobs reach a terminal state (completed, failed, or cancelled):
Success
def mark_completed ( session : Session, job_id : str , result_path : str ) -> None :
job = get_job(session, job_id)
if not job:
return
job.status = "completed"
job.result_path = result_path
job.error_message = None
job.completed_at = utcnow()
job.updated_at = utcnow()
append_event(
session,
job.id,
"complete" ,
"success" ,
"Video generated successfully." ,
{ "path" : result_path},
)
session.commit()
Failure
def mark_failed ( session : Session, job_id : str , error_message : str ) -> None :
job = get_job(session, job_id)
if not job:
return
job.status = "failed"
job.error_message = error_message
job.completed_at = utcnow()
job.updated_at = utcnow()
append_event(session, job.id, "error" , "error" , error_message)
session.commit()
Cancellation
def mark_cancelled (
session : Session, job_id : str , reason : str = "Job cancelled."
) -> None :
job = get_job(session, job_id)
if not job:
return
job.status = "cancelled"
job.completed_at = utcnow()
job.updated_at = utcnow()
append_event(session, job.id, "cancelled" , "warning" , reason)
session.commit()
Cancellation
Jobs can be cancelled at any stage:
Requesting Cancellation
def request_cancel ( session : Session, job_id : str ) -> bool :
job = get_job(session, job_id)
if not job:
return False
if job.status in ( "completed" , "failed" , "cancelled" ):
return True # Already terminal
job.cancel_requested = True
job.updated_at = utcnow()
append_event(
session, job.id, "cancel_requested" , "warning" , "Cancellation requested."
)
# Immediately cancel queued jobs
if job.status == "queued" :
job.status = "cancelled"
job.completed_at = utcnow()
append_event(
session, job.id, "cancelled" , "warning" , "Job cancelled before execution."
)
session.commit()
return True
Checking Cancellation (Pipeline)
The worker checks cancellation status during processing:
def run_generation_pipeline (
data : dict ,
is_cancelled ,
on_log ,
amount_of_stock_videos : int = 5 ,
) -> str :
def guard_cancelled () -> None :
if is_cancelled and is_cancelled():
raise PipelineCancelled( "Video generation was cancelled." )
# Check before expensive operations
guard_cancelled()
script = generate_script( ... )
guard_cancelled()
video_urls = search_for_stock_videos( ... )
guard_cancelled()
# ...
Querying Jobs
Frontend and API query job state:
@app.route ( "/api/jobs/<job_id>" , methods = [ "GET" ])
def get_job_status ( job_id : str ):
with SessionLocal() as session:
job = get_job(session, job_id)
if not job:
return jsonify({ "status" : "error" , "message" : "Job not found." }), 404
return jsonify(
{
"status" : "success" ,
"job" : {
"id" : job.id,
"state" : job.status,
"cancelRequested" : job.cancel_requested,
"resultPath" : job.result_path,
"errorMessage" : job.error_message,
"createdAt" : job.created_at.isoformat() if job.created_at else None ,
"startedAt" : job.started_at.isoformat() if job.started_at else None ,
"completedAt" : job.completed_at.isoformat()
if job.completed_at
else None ,
},
}
)
Scaling Considerations
Multiple Workers
Run multiple workers for higher throughput:
# Terminal 1
uv run python Backend/worker.py
# Terminal 2
uv run python Backend/worker.py
# Terminal 3
uv run python Backend/worker.py
Or with Docker:
docker compose up --scale worker= 5
Multiple workers require Postgres with SKIP LOCKED. SQLite does not support concurrent claims.
Queue Metrics
Query queue depth and worker utilization:
-- Queued jobs
SELECT COUNT ( * ) FROM generation_jobs WHERE status = 'queued' ;
-- Running jobs
SELECT COUNT ( * ) FROM generation_jobs WHERE status = 'running' ;
-- Average completion time
SELECT AVG (EXTRACT(EPOCH FROM (completed_at - created_at))) as avg_seconds
FROM generation_jobs
WHERE status = 'completed'
AND completed_at > NOW () - INTERVAL '1 hour' ;
Future Enhancements
Retry logic : Automatic retry with exponential backoff
Priority queue : High-priority jobs skip to front
Dead-letter queue : Archive permanently failed jobs
Job expiration : Auto-cancel jobs older than N hours
Batch processing : Process multiple jobs per worker cycle
Next Steps
Architecture System design overview
Pipeline Video generation pipeline
Generating Videos Create videos through UI and API
Docker Setup Deploy with multiple workers