Skip to main content
This example demonstrates an ETL (Extract, Transform, Load) data pipeline that processes data from multiple sources in parallel, applies transformations, and loads results into a data warehouse.

Overview

The pipeline workflow:
  • Extracts data from multiple sources simultaneously
  • Transforms data in parallel batches
  • Validates transformed data
  • Loads data into warehouse
  • Sends completion notification

Workflow Implementation

<?php

namespace App\Workflows;

use function Workflow\{activity, all, timer};
use Workflow\Workflow;

class DataPipelineWorkflow extends Workflow
{
    public function execute(string $pipelineId, array $sources, string $destination)
    {
        // Step 1: Extract data from multiple sources in parallel
        $extractPromises = [];
        foreach ($sources as $source) {
            $extractPromises[] = activity(
                ExtractDataActivity::class,
                $source['type'],
                $source['config']
            );
        }
        
        // Wait for all extractions to complete
        $extractedData = yield all($extractPromises);

        // Step 2: Merge extracted data
        $mergedData = yield activity(
            MergeDataActivity::class,
            $extractedData
        );

        // Step 3: Transform data in parallel batches
        $batchSize = 1000;
        $batches = array_chunk($mergedData, $batchSize);
        
        $transformPromises = [];
        foreach ($batches as $batchIndex => $batch) {
            $transformPromises[] = activity(
                TransformDataActivity::class,
                $batch,
                $batchIndex
            );
        }
        
        $transformedBatches = yield all($transformPromises);
        $transformedData = array_merge(...$transformedBatches);

        // Step 4: Validate transformed data
        $validationResult = yield activity(
            ValidateDataActivity::class,
            $transformedData
        );

        if (!$validationResult['valid']) {
            // Log validation errors and continue with valid records
            yield activity(
                LogValidationErrorsActivity::class,
                $pipelineId,
                $validationResult['errors']
            );
            
            $transformedData = $validationResult['validRecords'];
        }

        // Step 5: Load data into warehouse in parallel batches
        $loadBatches = array_chunk($transformedData, $batchSize);
        
        $loadPromises = [];
        foreach ($loadBatches as $batchIndex => $batch) {
            $loadPromises[] = activity(
                LoadDataActivity::class,
                $destination,
                $batch,
                $batchIndex
            );
        }
        
        $loadResults = yield all($loadPromises);

        // Step 6: Create pipeline execution summary
        $summary = yield activity(
            CreatePipelineSummaryActivity::class,
            $pipelineId,
            [
                'sources' => count($sources),
                'extracted' => count($mergedData),
                'transformed' => count($transformedData),
                'loaded' => array_sum($loadResults),
                'errors' => $validationResult['errors'] ?? [],
            ]
        );

        // Step 7: Send completion notification
        yield activity(
            SendPipelineNotificationActivity::class,
            $pipelineId,
            $summary
        );

        return [
            'pipelineId' => $pipelineId,
            'status' => 'completed',
            'recordsProcessed' => array_sum($loadResults),
            'summary' => $summary,
        ];
    }
}

Activity Implementations

Extract Data Activity

<?php

namespace App\Activities;

use App\Services\DataExtractors\DatabaseExtractor;
use App\Services\DataExtractors\ApiExtractor;
use App\Services\DataExtractors\FileExtractor;
use Workflow\Activity;

class ExtractDataActivity extends Activity
{
    public int $timeout = 300; // 5 minutes
    public int $tries = 3;

    public function execute(string $sourceType, array $config): array
    {
        $extractor = match ($sourceType) {
            'database' => new DatabaseExtractor($config),
            'api' => new ApiExtractor($config),
            'file' => new FileExtractor($config),
            default => throw new \Exception("Unknown source type: {$sourceType}"),
        };

        $data = $extractor->extract();

        logger()->info("Extracted {count} records from {type}", [
            'count' => count($data),
            'type' => $sourceType,
        ]);

        return $data;
    }
}

Transform Data Activity

<?php

namespace App\Activities;

use App\Services\DataTransformers\CustomerTransformer;
use Workflow\Activity;

class TransformDataActivity extends Activity
{
    public int $timeout = 180; // 3 minutes

    public function execute(array $batch, int $batchIndex): array
    {
        $transformer = new CustomerTransformer();
        $transformed = [];

        foreach ($batch as $record) {
            try {
                $transformed[] = $transformer->transform($record);
            } catch (\Exception $e) {
                logger()->warning("Transform failed for record in batch {$batchIndex}", [
                    'error' => $e->getMessage(),
                    'record' => $record,
                ]);
            }
        }

        // Send heartbeat for long-running transformations
        $this->heartbeat();

        return $transformed;
    }
}

Validate Data Activity

<?php

namespace App\Activities;

use Illuminate\Support\Facades\Validator;
use Workflow\Activity;

class ValidateDataActivity extends Activity
{
    public function execute(array $data): array
    {
        $validRecords = [];
        $errors = [];

        foreach ($data as $index => $record) {
            $validator = Validator::make($record, [
                'customer_id' => 'required|integer',
                'email' => 'required|email',
                'name' => 'required|string',
                'created_at' => 'required|date',
            ]);

            if ($validator->fails()) {
                $errors[] = [
                    'index' => $index,
                    'record' => $record,
                    'errors' => $validator->errors()->toArray(),
                ];
            } else {
                $validRecords[] = $record;
            }
        }

        return [
            'valid' => empty($errors),
            'validRecords' => $validRecords,
            'errors' => $errors,
        ];
    }
}

Load Data Activity

<?php

namespace App\Activities;

use Illuminate\Support\Facades\DB;
use Workflow\Activity;

class LoadDataActivity extends Activity
{
    public int $timeout = 300;
    public int $tries = 3;

    public function execute(
        string $destination,
        array $batch,
        int $batchIndex
    ): int {
        $loaded = 0;

        DB::transaction(function () use ($destination, $batch, &$loaded) {
            foreach ($batch as $record) {
                DB::table($destination)->insert($record);
                $loaded++;
                
                // Heartbeat every 100 records
                if ($loaded % 100 === 0) {
                    $this->heartbeat();
                }
            }
        });

        logger()->info("Loaded batch {$batchIndex}: {$loaded} records");

        return $loaded;
    }
}

Create Pipeline Summary Activity

<?php

namespace App\Activities;

use App\Models\PipelineExecution;
use Workflow\Activity;

class CreatePipelineSummaryActivity extends Activity
{
    public function execute(string $pipelineId, array $stats): array
    {
        $execution = PipelineExecution::create([
            'pipeline_id' => $pipelineId,
            'status' => 'completed',
            'sources_count' => $stats['sources'],
            'extracted_count' => $stats['extracted'],
            'transformed_count' => $stats['transformed'],
            'loaded_count' => $stats['loaded'],
            'error_count' => count($stats['errors']),
            'errors' => $stats['errors'],
            'duration' => now()->diffInSeconds($this->workflowStartTime()),
        ]);

        return [
            'executionId' => $execution->id,
            'stats' => $stats,
        ];
    }
}

Starting the Workflow

use App\Workflows\DataPipelineWorkflow;
use Workflow\WorkflowStub;

$workflow = WorkflowStub::make(DataPipelineWorkflow::class);

$workflow->start(
    pipelineId: 'customer-sync-2026-03',
    sources: [
        [
            'type' => 'database',
            'config' => [
                'connection' => 'legacy_db',
                'table' => 'customers',
                'where' => ['active' => true],
            ],
        ],
        [
            'type' => 'api',
            'config' => [
                'url' => 'https://api.example.com/customers',
                'token' => env('API_TOKEN'),
            ],
        ],
        [
            'type' => 'file',
            'config' => [
                'path' => 's3://bucket/customers.csv',
                'format' => 'csv',
            ],
        ],
    ],
    destination: 'data_warehouse.customers'
);

Scheduling Recurring Pipelines

You can schedule the pipeline to run daily:
// app/Console/Kernel.php

protected function schedule(Schedule $schedule)
{
    $schedule->call(function () {
        $workflow = WorkflowStub::make(DataPipelineWorkflow::class);
        
        $workflow->start(
            pipelineId: 'daily-customer-sync-' . now()->format('Y-m-d'),
            sources: config('pipelines.customer_sources'),
            destination: 'data_warehouse.customers'
        );
    })->dailyAt('02:00');
}

Monitoring Pipeline Progress

use Workflow\WorkflowStub;

$workflow = WorkflowStub::load($workflowId);

// Get activity logs to see progress
$logs = $workflow->logs();

foreach ($logs as $log) {
    echo "{$log->class}: {$log->status}\n";
}

// Check if still running
if ($workflow->running()) {
    echo "Pipeline is processing...\n";
}

// Get final results
if ($workflow->completed()) {
    $result = $workflow->output();
    echo "Processed {$result['recordsProcessed']} records\n";
}

Error Handling and Retries

The pipeline handles various failure scenarios:
  • Source Unavailable: Automatic retry with exponential backoff (3 attempts)
  • Transformation Errors: Individual record errors are logged, pipeline continues
  • Validation Failures: Invalid records are logged, valid records proceed
  • Load Failures: Transaction rollback per batch, automatic retry
  • Timeout Protection: Activities have timeout limits with heartbeat support

Performance Optimization

Parallel Extraction

// Multiple sources extracted simultaneously
$extractPromises = [
    activity(ExtractDataActivity::class, $source1),
    activity(ExtractDataActivity::class, $source2),
    activity(ExtractDataActivity::class, $source3),
];
$results = yield all($extractPromises);

Batch Processing

// Process 1000 records per batch
$batches = array_chunk($data, 1000);

$promises = [];
foreach ($batches as $batch) {
    $promises[] = activity(TransformDataActivity::class, $batch);
}

$transformed = yield all($promises);

Key Features

  • Parallel Processing: Extract and load operations run concurrently
  • Batch Processing: Large datasets processed in manageable chunks
  • Error Tolerance: Continues processing despite individual record failures
  • Heartbeat Support: Long-running activities send heartbeats
  • Comprehensive Logging: Detailed audit trail of all operations
  • Automatic Retries: Transient failures automatically retry
  • Progress Tracking: Monitor pipeline execution in real-time

Build docs developers (and LLMs) love