Skip to main content

Overview

Aeros provides a robust queue system for processing background jobs asynchronously. The system uses Redis for reliable job storage and supports both blocking and polling worker modes.

Creating Jobs

All jobs must extend the Job abstract class and implement the doWork() method:
app/Jobs/SendEmailJob.php
use Aeros\Src\Classes\Job;

class SendEmailJob extends Job
{
    public string $email;
    public string $subject;
    public string $body;
    
    public function __construct(string $email, string $subject, string $body)
    {
        parent::__construct(); // Generates UUID
        
        $this->email = $email;
        $this->subject = $subject;
        $this->body = $body;
    }
    
    public function doWork(): bool
    {
        try {
            // Send email logic
            mail($this->email, $this->subject, $this->body);
            
            return true; // Job succeeded
        } catch (\Exception $e) {
            logger()->log("Email job failed: " . $e->getMessage());
            return false; // Job failed, will be retried
        }
    }
}
Each job automatically receives a unique UUID when constructed, enabling tracking and preventing duplicate processing.

Dispatching Jobs

Push Single Job

// Push a job instance
$job = new SendEmailJob('[email protected]', 'Welcome', 'Hello!');
queue()->push($job);

// Push a job class (will be instantiated)
queue()->push(SendEmailJob::class);

Push Multiple Jobs

$jobs = [
    new SendEmailJob('[email protected]', 'Subject 1', 'Body 1'),
    new SendEmailJob('[email protected]', 'Subject 2', 'Body 2'),
    new ProcessDataJob($data),
];

queue()->push($jobs);

Custom Pipeline

Organize jobs into different pipelines (queues):
// Default pipeline: {APP_NAME}_gx_pipelines
queue()->push($job);

// Custom pipeline
queue()->push($job, 'emails');
queue()->push($job, 'reports');
The queue system prevents duplicate jobs by checking if a job of the same class already exists in the pipeline. If found, the new job won’t be added.

Processing Jobs

Blocking workers wait for jobs without polling, providing better performance and lower latency:
worker.php
// Process jobs indefinitely
while (true) {
    // Blocks for 5 seconds waiting for a job
    queue()->processPipelineBlocking('*', 5);
}
# Run the worker
php worker.php

Custom Blocking Configuration

// Block for 10 seconds
queue()->processPipelineBlocking('emails', 10);

// Infinite blocking (wait forever)
queue()->processPipelineBlocking('*', 0);

// Process multiple pipelines
queue()->processPipelineBlocking(['emails', 'reports'], 5);

Manual Job Processing

For fine-grained control, manually pop and process jobs:
// Blocking pop (waits for a job)
$job = queue()->blockingPop('*', 5);

if ($job) {
    $result = $job->doWork();
    
    if ($result) {
        logger()->log("Job {$job->uuid} completed");
    } else {
        logger()->log("Job {$job->uuid} failed");
        // Failed jobs are automatically re-queued
    }
}

Non-Blocking Pop

Pop a job immediately without waiting:
$job = queue()->pop('emails');

if ($job) {
    $job->doWork();
}

Polling Worker (Legacy)

Polling workers are less efficient than blocking workers and should be avoided for production use. Use processPipelineBlocking() instead.
// Polls the queue (not recommended)
while (true) {
    queue()->processPipeline('*');
    sleep(1); // Prevent CPU overuse
}

Job States & Locking

The queue system manages three job states:
  • locked - Job is currently being processed
  • completed - Job finished successfully
  • failed - Job failed and was re-queued

Job Locking

Jobs are automatically locked during processing to prevent duplicate execution:
// Job is locked with 10-second timeout
queue()->processPipelineBlocking();

// If job takes longer than 10 seconds, lock expires
// and another worker can pick it up

Check Job Status

// Get completed jobs
$completedJobs = queue()->getJobStatus('emails', Queue::COMPLETED_STATE);

foreach ($completedJobs as $job) {
    echo "Job {$job['uuid']} completed at {$job['timestamp']}\n";
}

// Get failed jobs
$failedJobs = queue()->getJobStatus('emails', Queue::FAILED_STATE);

Clear Job Status

// Clear completed jobs from all pipelines
queue()->clearJobStatus(Queue::COMPLETED_STATE, '*');

// Clear failed jobs from specific pipeline
queue()->clearJobStatus(Queue::FAILED_STATE, 'emails');

// Clear all statuses
queue()->clearJobStatus('*', '*');

Pipeline Management

Default Pipeline

By default, jobs are pushed to {APP_NAME}_gx_pipelines:
// These are equivalent
queue()->push($job);
queue()->push($job, '*');
queue()->push($job, 'all');

Multiple Pipelines

Process jobs from multiple pipelines:
// Process all pipelines
queue()->processPipelineBlocking(['emails', 'reports', 'notifications'], 5);

Example: Email Queue Worker

worker-emails.php
#!/usr/bin/env php
<?php

require __DIR__ . '/bootstrap.php';

echo "Email worker started...\n";

while (true) {
    try {
        // Process email jobs with 5-second timeout
        $result = queue()->processPipelineBlocking('emails', 5);
        
        if ($result === false) {
            // No jobs available (timeout reached)
            continue;
        }
        
        echo "Processed email job\n";
        
    } catch (\Exception $e) {
        logger()->log("Worker error: " . $e->getMessage());
        sleep(5); // Wait before retrying
    }
}
Run the worker:
php worker-emails.php

Example: Multi-Pipeline Worker

worker-all.php
#!/usr/bin/env php
<?php

require __DIR__ . '/bootstrap.php';

$pipelines = ['emails', 'reports', 'notifications'];

echo "Multi-pipeline worker started...\n";

while (true) {
    try {
        queue()->processPipelineBlocking($pipelines, 5);
    } catch (\Exception $e) {
        logger()->log("Worker error: " . $e->getMessage());
    }
}

Best Practices

Use Blocking Workers

Always prefer processPipelineBlocking() over processPipeline() for better performance and lower resource usage.

Idempotent Jobs

Design jobs to be idempotent so they can be safely retried if they fail.

Return Boolean

Always return true for success and false for failure in doWork(). Failed jobs are automatically re-queued.

Handle Exceptions

Catch exceptions in doWork() and return false to trigger retry logic.

Monitor Job Status

Regularly check and clear completed/failed job statuses to prevent memory bloat.

Separate Pipelines

Use different pipelines for different job types to enable prioritization and dedicated workers.

Advanced: Job Retry Logic

class ResilientJob extends Job
{
    public int $maxRetries = 3;
    public int $retryCount = 0;
    
    public function doWork(): bool
    {
        try {
            // Attempt work
            $this->performTask();
            return true;
            
        } catch (\Exception $e) {
            $this->retryCount++;
            
            if ($this->retryCount >= $this->maxRetries) {
                // Max retries reached, log and fail permanently
                logger()->log("Job {$this->uuid} failed after {$this->maxRetries} attempts");
                return true; // Return true to prevent re-queueing
            }
            
            logger()->log("Job {$this->uuid} failed, retry {$this->retryCount}/{$this->maxRetries}");
            return false; // Retry
        }
    }
}

Configuration

Ensure your cache configuration includes a queue connection:
config/cache.php
return [
    'connections' => [
        'queue' => [
            'driver' => 'redis',
        ],
    ],
];
The queue system uses the cache('queue') connection for all operations.

Build docs developers (and LLMs) love