Skip to main content
The Queue system provides a robust job processing mechanism with support for multiple pipelines, blocking operations, and job state management.

Queue Class

Namespace

Aeros\Src\Classes\Queue

Constants

const DEFAULT_PIPELINE = 'gx_pipelines';
const LOCKED_STATE = 'locked';
const COMPLETED_STATE = 'completed';
const FAILED_STATE = 'failed';
const DEFAULT_BLOCK_TIMEOUT = 5; // seconds

Methods

blockingPop

Blocking pop operation that waits for a job without polling. This is the recommended solution for real-time queue processing.
public function blockingPop(
    string|array $pipelineName = '*',
    int $timeout = self::DEFAULT_BLOCK_TIMEOUT
): Job|bool
pipelineName
string|array
default:"'*'"
Pipeline name(s) to listen to. Use '*' or 'all' for the default pipeline.
timeout
int
default:"5"
Seconds to wait for a job. Use 0 for infinite timeout.
Returns: Job instance if a job is available, false otherwise

Usage Example

$queue = new Queue();

// Wait up to 5 seconds for a job
$job = $queue->blockingPop();

// Wait indefinitely
$job = $queue->blockingPop('*', 0);

// Listen to specific pipeline
$job = $queue->blockingPop('email_queue', 10);

processPipelineBlocking

Processes a pipeline using blocking pop. This is the recommended method for workers.
public function processPipelineBlocking(
    string|array $pipelineName = '*',
    int $timeout = self::DEFAULT_BLOCK_TIMEOUT
): mixed
pipelineName
string|array
default:"'*'"
Pipeline name(s) to process.
timeout
int
default:"5"
Block timeout in seconds.
Returns: true on successful job execution, false otherwise

Usage Example

// Worker loop using blocking pop
while (true) {
    $queue->processPipelineBlocking('default', 5);
}

push

Adds a Job or an array of jobs to a pipeline tail in the queue system.
public function push(
    array|string|Job $jobs,
    string $pipelineName = '*'
): bool
jobs
array|string|Job
One or more jobs to add. Can be a Job instance, job class name, or array of either.
pipelineName
string
default:"'*'"
Pipeline name to add jobs to.
Returns: true on success

Usage Example

$queue = new Queue();

// Push single job instance
$queue->push(new SendEmailJob());

// Push job by class name
$queue->push(SendEmailJob::class);

// Push multiple jobs
$queue->push([
    new SendEmailJob(),
    new ProcessImageJob(),
]);

// Push to specific pipeline
$queue->push(new SendEmailJob(), 'email_pipeline');

pushJob

Pushes a single job to a pipeline. Prevents duplicate jobs of the same class.
public function pushJob(string $pipelineName, Job $job): bool
pipelineName
string
Pipeline name.
job
Job
Job instance to push.
Returns: true if job was added, false if duplicate exists

pop

Executes and removes a Job from the pipeline’s head.
public function pop(string $pipelineName = '*'): Job|bool
pipelineName
string
default:"'*'"
Pipeline name to pop from.
Returns: Job instance or false if no jobs available

Usage Example

$job = $queue->pop('default');

if ($job) {
    $job->doWork();
}

processPipeline

Processes and executes all jobs from a pipeline.
This method uses polling and should be avoided for workers. Use processPipelineBlocking() instead for better performance.
public function processPipeline(array|string $pipelineName = '*'): mixed
pipelineName
array|string
default:"'*'"
Pipeline name(s) to process. Use '*' or 'all' to process all pipelines.
Returns: Array of job statuses if processing multiple pipelines, boolean for single pipeline

Usage Example

// Process default pipeline
$queue->processPipeline();

// Process all pipelines
$queue->processPipeline('all');

// Process specific pipeline
$queue->processPipeline('email_queue');

// Process multiple pipelines
$statuses = $queue->processPipeline(['email_queue', 'image_queue']);

getJobStatus

Gets a list of job UUIDs and their timestamps.
public function getJobStatus(
    string $pipelineName = '*',
    string $state = Queue::COMPLETED_STATE
): array
pipelineName
string
default:"'*'"
Pipeline name to query.
state
string
default:"'completed'"
Job state to filter by: 'completed', 'failed', or 'locked'.
Returns: Array of job status objects with uuid and timestamp properties

Usage Example

// Get completed jobs
$completed = $queue->getJobStatus('default', Queue::COMPLETED_STATE);

// Get failed jobs
$failed = $queue->getJobStatus('default', Queue::FAILED_STATE);

clearJobStatus

Deletes all job statuses from the cache based on a pipeline name and state.
public function clearJobStatus(
    string $state = '*',
    string $pipelineName = '*'
): int
state
string
default:"'*'"
State to clear: 'completed', 'failed', 'locked', or '*' for all.
pipelineName
string
default:"'*'"
Pipeline name to clear statuses from.
Returns: Number of statuses deleted

Usage Example

// Clear all completed jobs
$count = $queue->clearJobStatus(Queue::COMPLETED_STATE);

// Clear all statuses for a pipeline
$count = $queue->clearJobStatus('*', 'email_queue');

Job Class

Namespace

Aeros\Src\Classes\Job

Properties

public string $uuid;
Each job is automatically assigned a unique UUID on construction.

Methods

__construct

Constructor that adds a UUID to each job.
public function __construct()

doWork

Abstract method that runs the job work. Must be implemented by all job classes.
abstract public function doWork(): bool
Returns: true if job completed successfully, false otherwise

Creating Custom Jobs

To create a custom job, extend the Job class and implement the doWork() method:
use Aeros\Src\Classes\Job;

class SendEmailJob extends Job
{
    private string $to;
    private string $subject;
    private string $body;

    public function __construct(string $to, string $subject, string $body)
    {
        parent::__construct();
        $this->to = $to;
        $this->subject = $subject;
        $this->body = $body;
    }

    public function doWork(): bool
    {
        try {
            // Send email logic here
            mail($this->to, $this->subject, $this->body);
            return true;
        } catch (\Exception $e) {
            logger()->error('Email failed: ' . $e->getMessage());
            return false;
        }
    }
}

Queue Job Flow

  1. Job Creation - Job is instantiated and assigned a UUID
  2. Push to Queue - Job is serialized and added to a pipeline
  3. Worker Picks Job - Worker retrieves job using blocking or regular pop
  4. Job Locking - Job is locked to prevent duplicate processing
  5. Execution - doWork() method is called
  6. State Update - Job marked as completed or failed
  7. Unlock - Job lock is removed

Best Practices

  • Use processPipelineBlocking() for worker processes instead of processPipeline()
  • Implement proper error handling in doWork() method
  • Return true for successful execution, false for failures
  • Failed jobs are automatically re-queued for retry
  • Use specific pipeline names to organize different job types
  • Monitor job statuses using getJobStatus() for debugging

Build docs developers (and LLMs) love