The execution graph is a directed acyclic graph (DAG) that represents job dependencies and determines execution order. It automatically organizes jobs into levels where independent jobs can run in parallel.
Graph structure
The graph organizes jobs into execution levels from src/Graph.php:96-122:
public function toArray(): array
{
$sort = [];
$previous = [];
$sync = [];
$toIndex = 0;
foreach ($this->getSortAsc() as $job => $dependencies) {
$matchCount = 0;
foreach ($dependencies as $dependency) {
if (in_array($dependency, $previous, true)) {
++$matchCount;
break;
}
}
if ($matchCount === 1) {
$toIndex++;
$previous = [];
}
$sort[$toIndex][] = $job;
$previous[] = $job;
if ($this->jobs->find($job) !== null) {
$sync[$job] = $toIndex;
}
}
return $this->getSortJobs($sort, $sync);
}
Graph interface
From src/Interfaces/GraphInterface.php:14-57:
/**
* Describes the component in charge of defining job execution order, where each node contains async jobs.
*/
interface GraphInterface extends StringMappedInterface
{
/**
* Determines if the graph has the given `$job`.
*/
public function has(string $job): bool;
/**
* Retrieve dependencies for the given `$job`.
*/
public function get(string $job): VectorInterface;
/**
* Determines if the given `$job` has the given `$dependencies`.
*/
public function hasDependencies(string $job, string ...$dependencies): bool;
/**
* Return an instance with the specified `$name` and `$job` put.
*/
public function withPut(string $name, JobInterface $job): self;
/**
* Returns the graph as an array of arrays, where each array is a node with async jobs.
*/
public function toArray(): array;
}
Execution levels
Jobs are organized into levels based on dependencies:
$workflow = workflow(
a: async(new A()),
b: async(new B()),
c: async(new C(), input: response('a')),
d: async(new D(), input: response('a')),
e: async(new E(), input: response('c'), data: response('d'))
);
$graph = $workflow->jobs()->graph()->toArray();
// Result:
// [
// 0 => ['a', 'b'], // Level 0: No dependencies, run in parallel
// 1 => ['c', 'd'], // Level 1: Depend on 'a', run in parallel
// 2 => ['e'] // Level 2: Depends on 'c' and 'd'
// ]
Level 0: Independent jobs
Jobs with no dependencies execute first:workflow(
fetch1: async(new Fetch(), url: variable('url1')),
fetch2: async(new Fetch(), url: variable('url2')),
fetch3: async(new Fetch(), url: variable('url3'))
)
// All run in parallel at level 0
Level N: Dependent jobs
Jobs with dependencies execute after their dependencies:workflow(
fetch: async(new Fetch()), // Level 0
parse: async(
new Parse(),
data: response('fetch') // Level 1
),
store: async(
new Store(),
data: response('parse') // Level 2
)
)
Parallel execution within levels
Multiple async jobs at the same level run in parallel:workflow(
fetch: async(new Fetch()), // Level 0
processA: async(
new ProcessA(),
data: response('fetch') // Level 1 (parallel)
),
processB: async(
new ProcessB(),
data: response('fetch') // Level 1 (parallel)
),
processC: async(
new ProcessC(),
data: response('fetch') // Level 1 (parallel)
)
)
Building the graph
Jobs are added to the graph during workflow construction from src/Jobs.php:132-151:
private function putAdded(JobInterface ...$job): void
{
foreach ($job as $name => $item) {
$this->jobDependencies = $item->dependencies();
$name = strval($name);
$this->addMap($name, $item);
$this->jobs = $this->jobs->withPush($name);
$this->handleArguments($name, $item);
foreach ($item->runIf() as $runIf) {
$this->handleRunIfReference($runIf);
$this->handleRunIfVariable($name, $runIf);
}
foreach ($item->runIfNot() as $runIfNot) {
$this->handleRunIfReference($runIfNot);
$this->handleRunIfVariable($name, $runIfNot);
}
$this->storeReferences($name, $item);
$this->assertDependencies($name);
$this->graph = $this->graph->withPut($name, $item);
}
}
Dependency sources
Dependencies come from multiple sources:
Response references
Automatic dependencies from response() calls:
workflow(
fetch: async(new Fetch()),
process: async(
new Process(),
data: response('fetch') // Creates dependency: fetch → process
)
)
Explicit dependencies
Manual dependencies via withDepends():
workflow(
validate: async(new Validate(), data: variable('input')),
process: async(
new Process(),
data: variable('input')
)->withDepends('validate') // Explicit dependency: validate → process
)
Conditional dependencies
Boolean response references in withRunIf():
workflow(
validate: async(new Validate()),
process: async(new Process())
->withRunIf(
response('validate', 'isValid') // Creates dependency: validate → process
)
)
Synchronous jobs
Synchronous jobs force sequential execution from src/Graph.php:64-71:
if ($job->isSync()) {
if ($found === null) {
$new->jobs = $new->jobs->withPush($name);
}
} elseif ($found !== null) {
$new->jobs = $new->jobs->withRemove($found);
}
Example:
workflow(
a: async(new A()),
b: async(new B()),
c: sync(new C(), a: response('a')), // Sync job
d: async(new D(), c: response('c'))
)
// Graph:
// [
// 0 => ['a', 'b'], // Parallel async
// 1 => ['c'], // Sync job (blocks)
// 2 => ['d'] // Continues after sync
// ]
Synchronous jobs create a barrier - no jobs at subsequent levels execute until the sync job completes.
Visualizing the graph
Access the graph from a workflow run:
$run = run(
$workflow,
image: __DIR__ . '/src/php.jpeg',
saveDir: __DIR__ . '/src/output/',
);
$graph = $run->workflow()->jobs()->graph()->toArray();
echo "Workflow graph:\n";
foreach ($graph as $level => $jobs) {
echo " {$level}: " . implode('|', $jobs) . "\n";
}
// Output:
// 0: thumb|poster
// 1: storeThumb|storePoster
Cycle detection
The graph prevents circular dependencies from src/Graph.php:175-187:
private function assertNotSelfDependency(string $job, VectorInterface $vector): void
{
if (! $vector->contains($job)) {
return;
}
throw new InvalidArgumentException(
(string) message(
'Cannot declare job **%job%** as a self-dependency',
job: $job
)
);
}
Invalid example:
// This will throw an error
workflow(
a: async(
new A(),
data: response('b')
),
b: async(
new B(),
data: response('a') // Circular dependency!
)
)
Dependency validation
All job dependencies must exist from src/Jobs.php:457-471:
private function assertDependencies(string $job): void
{
$dependencies = $this->jobDependencies->toArray();
if (! $this->jobs->contains(...$dependencies)) {
$missing = array_diff($dependencies, $this->jobs->toArray());
throw new OutOfBoundsException(
(string) message(
'Job **%job%** has undeclared dependencies: `%dependencies%`',
job: $job,
dependencies: implode(', ', $missing),
)
);
}
}
Complex graph example
$workflow = workflow(
// Level 0: No dependencies
fetchUser: async(new FetchUser(), id: variable('userId')),
fetchSettings: async(new FetchSettings(), id: variable('userId')),
// Level 1: Depend on level 0
validateUser: async(
new ValidateUser(),
user: response('fetchUser')
),
applySettings: async(
new ApplySettings(),
user: response('fetchUser'),
settings: response('fetchSettings')
),
// Level 2: Depend on level 1
processUser: async(
new ProcessUser(),
user: response('applySettings')
)->withRunIf(
response('validateUser', 'isValid')
),
// Level 2: Parallel with processUser
notifyError: async(
new NotifyError(),
errors: response('validateUser', 'errors')
)->withRunIfNot(
response('validateUser', 'isValid')
),
// Level 3: Final step
logResult: sync(
new LogResult(),
userId: variable('userId')
)->withDepends('processUser')
);
$graph = $workflow->jobs()->graph()->toArray();
// [
// 0 => ['fetchUser', 'fetchSettings'],
// 1 => ['validateUser', 'applySettings'],
// 2 => ['processUser', 'notifyError'],
// 3 => ['logResult']
// ]
Maximize parallelization
Structure workflows to enable parallel execution:// Good: Independent jobs run in parallel
workflow(
fetch1: async(new Fetch(), url: variable('url1')),
fetch2: async(new Fetch(), url: variable('url2')),
fetch3: async(new Fetch(), url: variable('url3'))
)
// Suboptimal: Sequential execution
workflow(
fetch1: async(new Fetch(), url: variable('url1')),
fetch2: async(new Fetch(), url: variable('url2'))
->withDepends('fetch1'),
fetch3: async(new Fetch(), url: variable('url3'))
->withDepends('fetch2')
)
Minimize sync jobs
Use sync() only when necessary:workflow(
// Most jobs async for parallelization
prepare: async(new Prepare()),
fetch1: async(new Fetch()),
fetch2: async(new Fetch()),
// Only critical section is sync
updateDatabase: sync(
new UpdateDB(),
data1: response('fetch1'),
data2: response('fetch2')
)
)
Reduce dependency chains
Avoid unnecessarily long dependency chains:// Better: Shorter chain
workflow(
fetch: async(new Fetch()),
processA: async(new ProcessA(), data: response('fetch')),
processB: async(new ProcessB(), data: response('fetch')),
merge: async(
new Merge(),
a: response('processA'),
b: response('processB')
)
)
// Slower: Unnecessary sequential steps
workflow(
fetch: async(new Fetch()),
processA: async(new ProcessA(), data: response('fetch')),
processB: async(new ProcessB(), data: response('processA')),
merge: async(new Merge(), data: response('processB'))
)
Group related operations
Combine operations when they must execute together:// Consider combining these into one action
workflow(
step1: async(new SmallStep1()),
step2: async(new SmallStep2(), data: response('step1')),
step3: async(new SmallStep3(), data: response('step2'))
)
// Better: Single action with combined logic
workflow(
process: async(new CompleteProcess())
)
Best practices
Design for parallelism
Structure workflows to maximize concurrent execution:// Independent operations at the same level
workflow(
validateEmail: async(new ValidateEmail(), email: variable('email')),
validatePhone: async(new ValidatePhone(), phone: variable('phone')),
validateAddress: async(new ValidateAddress(), address: variable('address'))
)
Use meaningful job names
Job names appear in graph visualization:workflow(
fetchUserProfile: async(...),
fetchUserPreferences: async(...),
applyUserSettings: async(...)
)
// Much clearer than: job1, job2, job3
Leverage automatic dependencies
Let response references define the graph:workflow(
fetch: async(new Fetch()),
parse: async(new Parse(), data: response('fetch')),
store: async(new Store(), data: response('parse'))
)
// Dependencies automatically inferred
Monitor graph complexity
Keep graphs manageable:// If your graph has many levels or complex dependencies,
// consider breaking into multiple workflows
$workflow1 = workflow(
// Data gathering phase
...
);
$workflow2 = workflow(
// Processing phase
...
);
- Jobs - Creating jobs with dependencies
- Responses - How responses create dependencies
- Variables - Independent job inputs