Skip to main content

Overview

Middleware wraps around workflow and activity execution, providing hooks for logging, locking, event dispatching, and custom behavior.

Built-in Middleware

The package includes two essential middleware classes:

WithoutOverlappingMiddleware

Prevents concurrent execution of workflows and activities using cache-based locking. Location: src/Middleware/WithoutOverlappingMiddleware.php Purpose:
  • Ensures only one workflow instance executes at a time
  • Prevents activity race conditions
  • Uses cache as a distributed lock
How It Works: From src/Exception.php:66-76, workflows automatically use this middleware:
public function middleware()
{
    return [
        new WithoutOverlappingMiddleware(
            $this->storedWorkflow->id,
            WithoutOverlappingMiddleware::WORKFLOW,
            0,
            15
        ),
    ];
}
From src/Activity.php:135-146, activities also use it:
public function middleware()
{
    return [
        new WithoutOverlappingMiddleware(
            $this->storedWorkflow->id,
            WithoutOverlappingMiddleware::ACTIVITY,
            0,
            $this->timeout
        ),
        new ActivityMiddleware(),
    ];
}
Lock Types:
// Workflow lock - exclusive
const WORKFLOW = 1;

// Activity lock - concurrent activities allowed
const ACTIVITY = 2;
Constructor:
public function __construct(
    $workflowId,
    $type,              // WORKFLOW or ACTIVITY
    $releaseAfter = 0,  // Seconds before retry
    $expiresAfter = 0   // Lock expiration (0 = no expiration)
)
Locking Strategy: From src/Middleware/WithoutOverlappingMiddleware.php:74-137:
  1. Workflow Lock:
    • Acquires lock only if no workflow OR activities are running
    • Blocks all activities while workflow executes
    • Uses workflow semaphore counter
  2. Activity Lock:
    • Acquires lock if no workflow is running
    • Multiple activities can run concurrently
    • Uses activity semaphore list with unique keys
Example Lock Keys:
// Workflow
laravel-workflow-overlap:wf_abc123:workflow

// Activities
laravel-workflow-overlap:wf_abc123:activity
laravel-workflow-overlap:wf_abc123:activity:uuid-1
laravel-workflow-overlap:wf_abc123:activity:uuid-2

ActivityMiddleware

Handles activity lifecycle events and result propagation. Location: src/Middleware/ActivityMiddleware.php Purpose:
  • Dispatches activity lifecycle events
  • Captures activity results
  • Propagates results to workflow
  • Handles activity failures
Event Flow: From src/Middleware/ActivityMiddleware.php:23-66:
  1. ActivityStarted - Dispatched when activity begins
  2. Activity Execution - Runs the activity
  3. ActivityCompleted - Dispatched on success
  4. ActivityFailed - Dispatched on error
Implementation:
public function handle($job, $next): void
{
    $this->job = $job;
    $this->uuid = (string) Str::uuid();

    // Dispatch started event
    ActivityStarted::dispatch(
        $job->storedWorkflow->id,
        $this->uuid,
        $job::class,
        $job->index,
        json_encode($job->arguments),
        now()->format('Y-m-d\TH:i:s.u\Z')
    );

    try {
        // Execute activity
        $this->result = $next($job);
        
        // Register callback for after lock release
        $job->onUnlock = fn (bool $shouldSignal) => $this->onUnlock($shouldSignal);
    } catch (\Throwable $throwable) {
        // Dispatch failed event
        ActivityFailed::dispatch(...);
        throw $throwable;
    }
}
Result Propagation: From src/Middleware/ActivityMiddleware.php:68-91, after the lock is released:
public function onUnlock(bool $shouldSignal): void
{
    try {
        // Resume workflow with result
        $this->job->storedWorkflow->toWorkflow()
            ->next($this->job->index, $this->job->now, $this->job::class, $this->result, $shouldSignal);

        // Dispatch completed event
        ActivityCompleted::dispatch(
            $this->job->storedWorkflow->id,
            $this->uuid,
            json_encode($this->result),
            now()->format('Y-m-d\TH:i:s.u\Z'),
            $this->job::class,
            $this->job->index
        );
    } catch (TransitionNotFound) {
        // Workflow not ready, release job to retry
        if ($this->job->storedWorkflow->toWorkflow()->running()) {
            $this->job->release();
        }
    }
}

Custom Middleware

Create middleware for workflows or activities:

Logging Middleware

namespace App\Workflow\Middleware;

class LoggingMiddleware
{
    public function handle($job, $next)
    {
        $startTime = microtime(true);
        
        Log::info('Job starting', [
            'job' => get_class($job),
            'workflow_id' => $job->storedWorkflow->id ?? null,
        ]);
        
        try {
            $result = $next($job);
            
            Log::info('Job completed', [
                'job' => get_class($job),
                'duration' => microtime(true) - $startTime,
            ]);
            
            return $result;
        } catch (\Throwable $e) {
            Log::error('Job failed', [
                'job' => get_class($job),
                'duration' => microtime(true) - $startTime,
                'error' => $e->getMessage(),
            ]);
            
            throw $e;
        }
    }
}

Performance Monitoring Middleware

class PerformanceMiddleware
{
    public function handle($job, $next)
    {
        $startTime = microtime(true);
        $startMemory = memory_get_usage();
        
        $result = $next($job);
        
        $duration = (microtime(true) - $startTime) * 1000; // ms
        $memory = (memory_get_usage() - $startMemory) / 1024 / 1024; // MB
        
        // Log slow jobs
        if ($duration > 5000) {
            Log::warning('Slow job detected', [
                'job' => get_class($job),
                'duration_ms' => $duration,
                'memory_mb' => $memory,
            ]);
        }
        
        // Store metrics
        Metric::create([
            'type' => 'job_performance',
            'job' => get_class($job),
            'duration' => $duration,
            'memory' => $memory,
        ]);
        
        return $result;
    }
}

Rate Limiting Middleware

use Illuminate\Support\Facades\RateLimiter;

class RateLimitMiddleware
{
    public function __construct(
        private string $key,
        private int $maxAttempts = 10,
        private int $decayMinutes = 1
    ) {}
    
    public function handle($job, $next)
    {
        $key = $this->key . ':' . ($job->storedWorkflow->id ?? 'unknown');
        
        if (RateLimiter::tooManyAttempts($key, $this->maxAttempts)) {
            $seconds = RateLimiter::availableIn($key);
            
            Log::info('Rate limit hit, releasing job', [
                'job' => get_class($job),
                'retry_after' => $seconds,
            ]);
            
            $job->release($seconds);
            return;
        }
        
        RateLimiter::hit($key, $this->decayMinutes * 60);
        
        return $next($job);
    }
}

Timeout Middleware

class TimeoutMiddleware
{
    public function __construct(private int $seconds = 60) {}
    
    public function handle($job, $next)
    {
        $timeout = $this->seconds;
        
        // Set up timeout handler
        pcntl_signal(SIGALRM, function () use ($job) {
            throw new \RuntimeException(
                'Job exceeded timeout of ' . $this->seconds . ' seconds'
            );
        });
        
        pcntl_alarm($timeout);
        
        try {
            $result = $next($job);
            pcntl_alarm(0); // Cancel alarm
            return $result;
        } catch (\Throwable $e) {
            pcntl_alarm(0); // Cancel alarm
            throw $e;
        }
    }
}

Error Recovery Middleware

class ErrorRecoveryMiddleware
{
    public function handle($job, $next)
    {
        try {
            return $next($job);
        } catch (\PDOException $e) {
            // Database connection lost, try to reconnect
            Log::warning('Database connection lost, reconnecting...');
            DB::reconnect();
            
            // Retry the job
            return $next($job);
        } catch (\RedisException $e) {
            // Redis connection lost
            Log::warning('Redis connection lost, reconnecting...');
            Redis::reconnect();
            
            return $next($job);
        }
    }
}

Applying Middleware

To Activities

Override the middleware() method:
use Workflow\Activity;

class PaymentActivity extends Activity
{
    public function middleware()
    {
        return [
            // Include default middleware
            ...parent::middleware(),
            
            // Add custom middleware
            new LoggingMiddleware(),
            new PerformanceMiddleware(),
            new RateLimitMiddleware('payment', maxAttempts: 5),
        ];
    }
    
    public function execute($amount)
    {
        // Activity logic
    }
}

To Workflows

Custom middleware for workflow execution:
use Workflow\Workflow;

class OrderWorkflow extends Workflow
{
    public function middleware()
    {
        return [
            new WithoutOverlappingMiddleware(
                $this->storedWorkflow->id,
                WithoutOverlappingMiddleware::WORKFLOW,
                releaseAfter: 5,
                expiresAfter: 300
            ),
            new LoggingMiddleware(),
            new PerformanceMiddleware(),
        ];
    }
}

Conditional Middleware

Apply middleware based on conditions:
class PaymentActivity extends Activity
{
    public function middleware()
    {
        $middleware = parent::middleware();
        
        // Add rate limiting only in production
        if (app()->environment('production')) {
            $middleware[] = new RateLimitMiddleware('payment');
        }
        
        // Add extra logging for high-value transactions
        if (($this->arguments[0] ?? 0) > 10000) {
            $middleware[] = new DetailedLoggingMiddleware();
            $middleware[] = new AlertingMiddleware();
        }
        
        return $middleware;
    }
}

Middleware Execution Order

Middleware executes in the order defined:
public function middleware()
{
    return [
        new FirstMiddleware(),   // 1. Executes first (outer)
        new SecondMiddleware(),  // 2. Executes second
        new ThirdMiddleware(),   // 3. Executes third (inner)
    ];
}
Execution flow:
FirstMiddleware::handle() {
    SecondMiddleware::handle() {
        ThirdMiddleware::handle() {
            // Actual job execution
        }
    }
}

Middleware Communication

Pass data between middleware layers:
class ContextMiddleware
{
    public function handle($job, $next)
    {
        // Add context to job
        $job->context = [
            'user_id' => auth()->id(),
            'ip' => request()->ip(),
            'timestamp' => now(),
        ];
        
        return $next($job);
    }
}

class LoggingMiddleware
{
    public function handle($job, $next)
    {
        // Access context from previous middleware
        Log::info('Job started', $job->context ?? []);
        
        return $next($job);
    }
}

Testing Middleware

use Illuminate\Foundation\Testing\RefreshDatabase;

class MiddlewareTest extends TestCase
{
    use RefreshDatabase;

    public function test_logging_middleware()
    {
        Log::spy();

        $workflow = WorkflowStub::make(OrderWorkflow::class);
        $workflow->start('order_123');

        Log::shouldHaveReceived('info')
            ->with('Job starting', \Mockery::type('array'));
    }

    public function test_rate_limiting_middleware()
    {
        // Exceed rate limit
        for ($i = 0; $i < 15; $i++) {
            dispatch(new RateLimitedJob());
        }

        $this->artisan('queue:work --once');

        // Some jobs should be released (rate limited)
        $this->assertDatabaseHas('jobs', [
            'queue' => 'default',
            'reserved_at' => null, // Not processed
        ]);
    }

    public function test_timeout_middleware()
    {
        $this->expectException(\RuntimeException::class);
        $this->expectExceptionMessage('Job exceeded timeout');

        $job = new SlowJob();
        (new TimeoutMiddleware(seconds: 1))->handle($job, fn() => sleep(5));
    }
}

Best Practices

Each middleware should have a single responsibility:
// ✅ Good - focused
class LoggingMiddleware { /* Only logging */ }
class RateLimitMiddleware { /* Only rate limiting */ }

// ❌ Bad - too much responsibility
class SuperMiddleware { /* Logging, rate limiting, caching, etc. */ }
Ensure middleware chain continues:
// ✅ Good
public function handle($job, $next)
{
    // Before logic
    $result = $next($job);
    // After logic
    return $result;
}

// ❌ Bad - breaks chain
public function handle($job, $next)
{
    // Logic
    return; // Next middleware never called!
}
public function handle($job, $next)
{
    try {
        return $next($job);
    } catch (\Exception $e) {
        // Log or handle
        Log::error('Job failed', ['error' => $e]);
        
        // Re-throw to maintain error handling
        throw $e;
    }
}
class CacheMiddleware
{
    public function __construct(
        private int $ttl = 3600,
        private string $driver = 'redis'
    ) {}
    
    public function handle($job, $next)
    {
        // Use configuration
        Cache::driver($this->driver)
            ->remember($job->id, $this->ttl, fn() => $next($job));
    }
}

Build docs developers (and LLMs) love