Overview
Aeros provides a robust queue system for processing background jobs asynchronously. The system uses Redis for reliable job storage and supports both blocking and polling worker modes.
Creating Jobs
All jobs must extend the Job abstract class and implement the doWork() method:
app/Jobs/SendEmailJob.php
use Aeros\Src\Classes\ Job ;
class SendEmailJob extends Job
{
public string $email ;
public string $subject ;
public string $body ;
public function __construct ( string $email , string $subject , string $body )
{
parent :: __construct (); // Generates UUID
$this -> email = $email ;
$this -> subject = $subject ;
$this -> body = $body ;
}
public function doWork () : bool
{
try {
// Send email logic
mail ( $this -> email , $this -> subject , $this -> body );
return true ; // Job succeeded
} catch ( \ Exception $e ) {
logger () -> log ( "Email job failed: " . $e -> getMessage ());
return false ; // Job failed, will be retried
}
}
}
Each job automatically receives a unique UUID when constructed, enabling tracking and preventing duplicate processing.
Dispatching Jobs
Push Single Job
// Push a job instance
$job = new SendEmailJob ( '[email protected] ' , 'Welcome' , 'Hello!' );
queue () -> push ( $job );
// Push a job class (will be instantiated)
queue () -> push ( SendEmailJob :: class );
Push Multiple Jobs
$jobs = [
new SendEmailJob ( '[email protected] ' , 'Subject 1' , 'Body 1' ),
new SendEmailJob ( '[email protected] ' , 'Subject 2' , 'Body 2' ),
new ProcessDataJob ( $data ),
];
queue () -> push ( $jobs );
Custom Pipeline
Organize jobs into different pipelines (queues):
// Default pipeline: {APP_NAME}_gx_pipelines
queue () -> push ( $job );
// Custom pipeline
queue () -> push ( $job , 'emails' );
queue () -> push ( $job , 'reports' );
The queue system prevents duplicate jobs by checking if a job of the same class already exists in the pipeline. If found, the new job won’t be added.
Processing Jobs
Blocking Worker (Recommended)
Blocking workers wait for jobs without polling, providing better performance and lower latency:
// Process jobs indefinitely
while ( true ) {
// Blocks for 5 seconds waiting for a job
queue () -> processPipelineBlocking ( '*' , 5 );
}
# Run the worker
php worker.php
Custom Blocking Configuration
// Block for 10 seconds
queue () -> processPipelineBlocking ( 'emails' , 10 );
// Infinite blocking (wait forever)
queue () -> processPipelineBlocking ( '*' , 0 );
// Process multiple pipelines
queue () -> processPipelineBlocking ([ 'emails' , 'reports' ], 5 );
Manual Job Processing
For fine-grained control, manually pop and process jobs:
// Blocking pop (waits for a job)
$job = queue () -> blockingPop ( '*' , 5 );
if ( $job ) {
$result = $job -> doWork ();
if ( $result ) {
logger () -> log ( "Job { $job -> uuid } completed" );
} else {
logger () -> log ( "Job { $job -> uuid } failed" );
// Failed jobs are automatically re-queued
}
}
Non-Blocking Pop
Pop a job immediately without waiting:
$job = queue () -> pop ( 'emails' );
if ( $job ) {
$job -> doWork ();
}
Polling Worker (Legacy)
Polling workers are less efficient than blocking workers and should be avoided for production use. Use processPipelineBlocking() instead.
// Polls the queue (not recommended)
while ( true ) {
queue () -> processPipeline ( '*' );
sleep ( 1 ); // Prevent CPU overuse
}
Job States & Locking
The queue system manages three job states:
locked - Job is currently being processed
completed - Job finished successfully
failed - Job failed and was re-queued
Job Locking
Jobs are automatically locked during processing to prevent duplicate execution:
// Job is locked with 10-second timeout
queue () -> processPipelineBlocking ();
// If job takes longer than 10 seconds, lock expires
// and another worker can pick it up
Check Job Status
// Get completed jobs
$completedJobs = queue () -> getJobStatus ( 'emails' , Queue :: COMPLETED_STATE );
foreach ( $completedJobs as $job ) {
echo "Job { $job ['uuid']} completed at { $job ['timestamp']} \n " ;
}
// Get failed jobs
$failedJobs = queue () -> getJobStatus ( 'emails' , Queue :: FAILED_STATE );
Clear Job Status
// Clear completed jobs from all pipelines
queue () -> clearJobStatus ( Queue :: COMPLETED_STATE , '*' );
// Clear failed jobs from specific pipeline
queue () -> clearJobStatus ( Queue :: FAILED_STATE , 'emails' );
// Clear all statuses
queue () -> clearJobStatus ( '*' , '*' );
Pipeline Management
Default Pipeline
By default, jobs are pushed to {APP_NAME}_gx_pipelines:
// These are equivalent
queue () -> push ( $job );
queue () -> push ( $job , '*' );
queue () -> push ( $job , 'all' );
Multiple Pipelines
Process jobs from multiple pipelines:
// Process all pipelines
queue () -> processPipelineBlocking ([ 'emails' , 'reports' , 'notifications' ], 5 );
Example: Email Queue Worker
#!/usr/bin/env php
<? php
require __DIR__ . '/bootstrap.php' ;
echo "Email worker started... \n " ;
while ( true ) {
try {
// Process email jobs with 5-second timeout
$result = queue () -> processPipelineBlocking ( 'emails' , 5 );
if ( $result === false ) {
// No jobs available (timeout reached)
continue ;
}
echo "Processed email job \n " ;
} catch ( \ Exception $e ) {
logger () -> log ( "Worker error: " . $e -> getMessage ());
sleep ( 5 ); // Wait before retrying
}
}
Run the worker:
Example: Multi-Pipeline Worker
#!/usr/bin/env php
<? php
require __DIR__ . '/bootstrap.php' ;
$pipelines = [ 'emails' , 'reports' , 'notifications' ];
echo "Multi-pipeline worker started... \n " ;
while ( true ) {
try {
queue () -> processPipelineBlocking ( $pipelines , 5 );
} catch ( \ Exception $e ) {
logger () -> log ( "Worker error: " . $e -> getMessage ());
}
}
Best Practices
Use Blocking Workers Always prefer processPipelineBlocking() over processPipeline() for better performance and lower resource usage.
Idempotent Jobs Design jobs to be idempotent so they can be safely retried if they fail.
Return Boolean Always return true for success and false for failure in doWork(). Failed jobs are automatically re-queued.
Handle Exceptions Catch exceptions in doWork() and return false to trigger retry logic.
Monitor Job Status Regularly check and clear completed/failed job statuses to prevent memory bloat.
Separate Pipelines Use different pipelines for different job types to enable prioritization and dedicated workers.
Advanced: Job Retry Logic
class ResilientJob extends Job
{
public int $maxRetries = 3 ;
public int $retryCount = 0 ;
public function doWork () : bool
{
try {
// Attempt work
$this -> performTask ();
return true ;
} catch ( \ Exception $e ) {
$this -> retryCount ++ ;
if ( $this -> retryCount >= $this -> maxRetries ) {
// Max retries reached, log and fail permanently
logger () -> log ( "Job { $this -> uuid } failed after { $this -> maxRetries } attempts" );
return true ; // Return true to prevent re-queueing
}
logger () -> log ( "Job { $this -> uuid } failed, retry { $this -> retryCount }/{ $this -> maxRetries }" );
return false ; // Retry
}
}
}
Configuration
Ensure your cache configuration includes a queue connection:
return [
'connections' => [
'queue' => [
'driver' => 'redis' ,
],
],
];
The queue system uses the cache('queue') connection for all operations.