<?php
namespace App\Workflows;
use function Workflow\{activity, all, timer};
use Workflow\Workflow;
class DataPipelineWorkflow extends Workflow
{
public function execute(string $pipelineId, array $sources, string $destination)
{
// Step 1: Extract data from multiple sources in parallel
$extractPromises = [];
foreach ($sources as $source) {
$extractPromises[] = activity(
ExtractDataActivity::class,
$source['type'],
$source['config']
);
}
// Wait for all extractions to complete
$extractedData = yield all($extractPromises);
// Step 2: Merge extracted data
$mergedData = yield activity(
MergeDataActivity::class,
$extractedData
);
// Step 3: Transform data in parallel batches
$batchSize = 1000;
$batches = array_chunk($mergedData, $batchSize);
$transformPromises = [];
foreach ($batches as $batchIndex => $batch) {
$transformPromises[] = activity(
TransformDataActivity::class,
$batch,
$batchIndex
);
}
$transformedBatches = yield all($transformPromises);
$transformedData = array_merge(...$transformedBatches);
// Step 4: Validate transformed data
$validationResult = yield activity(
ValidateDataActivity::class,
$transformedData
);
if (!$validationResult['valid']) {
// Log validation errors and continue with valid records
yield activity(
LogValidationErrorsActivity::class,
$pipelineId,
$validationResult['errors']
);
$transformedData = $validationResult['validRecords'];
}
// Step 5: Load data into warehouse in parallel batches
$loadBatches = array_chunk($transformedData, $batchSize);
$loadPromises = [];
foreach ($loadBatches as $batchIndex => $batch) {
$loadPromises[] = activity(
LoadDataActivity::class,
$destination,
$batch,
$batchIndex
);
}
$loadResults = yield all($loadPromises);
// Step 6: Create pipeline execution summary
$summary = yield activity(
CreatePipelineSummaryActivity::class,
$pipelineId,
[
'sources' => count($sources),
'extracted' => count($mergedData),
'transformed' => count($transformedData),
'loaded' => array_sum($loadResults),
'errors' => $validationResult['errors'] ?? [],
]
);
// Step 7: Send completion notification
yield activity(
SendPipelineNotificationActivity::class,
$pipelineId,
$summary
);
return [
'pipelineId' => $pipelineId,
'status' => 'completed',
'recordsProcessed' => array_sum($loadResults),
'summary' => $summary,
];
}
}