Overview
Metaflow supports several DAG patterns: linear chains, static branches (split/join), dynamic fan-out (foreach), and runtime conditionals (split-switch). metaflow-dagster translates each pattern into the corresponding Dagster graph structure.
All graph shapes are fully supported — the compiler handles the wiring automatically.
Linear
The simplest pattern: each step calls self.next() with a single successor.
from metaflow import FlowSpec, step
class LinearFlow ( FlowSpec ):
"""A simple linear flow for testing the Dagster integration."""
@step
def start ( self ):
self .message = "hello from start"
self .next( self .process)
@step
def process ( self ):
self .result = self .message + " -> process"
self .next( self .end)
@step
def end ( self ):
assert self .result == "hello from start -> process"
print ( "LinearFlow completed:" , self .result)
Generated Dagster Graph
Each step becomes a single @op with one input and one output:
@op ( out = Out( str ))
def op_start ( context : OpExecutionContext) -> str :
run_id = _make_run_id(context.run_id)
params_path = _run_init(context, run_id, "params" , {})
task_path = _run_step(context, "start" , run_id, params_path, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_process ( context : OpExecutionContext, start : str ) -> str :
run_id = start.split( "/" )[ 0 ]
task_path = _run_step(context, "process" , run_id, start, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@op ( ins = { "process" : In( str )})
def op_end ( context : OpExecutionContext, process : str ) -> None :
run_id = process.split( "/" )[ 0 ]
_run_step(context, "end" , run_id, process, "1" , ... )
@job
def LinearFlow ():
op_end(op_process(op_start()))
Each op receives the upstream task pathspec (e.g., "dagster-abc123/start/1") and passes it to the Metaflow step CLI via --input-paths. Artifacts flow through the datastore, not through Dagster’s output system.
Branching (Split/Join)
Static branches: the start (or a mid-flow split step) fans out to multiple branches that run in parallel, then converge at a join step.
from metaflow import FlowSpec, step
class BranchingFlow ( FlowSpec ):
"""A branching flow that splits and rejoins."""
@step
def start ( self ):
self .value = 10
self .next( self .branch_a, self .branch_b)
@step
def branch_a ( self ):
self .a_result = self .value * 2
self .next( self .join)
@step
def branch_b ( self ):
self .b_result = self .value + 5
self .next( self .join)
@step
def join ( self , inputs ):
self .merged_a = inputs.branch_a.a_result
self .merged_b = inputs.branch_b.b_result
self .next( self .end)
@step
def end ( self ):
assert self .merged_a == 20
assert self .merged_b == 15
print ( "BranchingFlow completed: a= %d b= %d " % ( self .merged_a, self .merged_b))
Generated Dagster Graph
The split step yields multiple named outputs. The join op receives all branch outputs as separate inputs:
# Split op (start step with multiple branches)
@op ( out = { "branch_a" : Out( str ), "branch_b" : Out( str )})
def op_start ( context : OpExecutionContext):
run_id = _make_run_id(context.run_id)
params_path = _run_init(context, run_id, "params" , {})
task_path = _run_step(context, "start" , run_id, params_path, "1" , ... )
_add_step_metadata(context, task_path, output_name = "branch_a" )
yield Output(task_path, output_name = "branch_a" )
yield Output(task_path, output_name = "branch_b" )
# Branch ops (linear steps)
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_branch_a ( context : OpExecutionContext, start : str ) -> str :
run_id = start.split( "/" )[ 0 ]
task_path = _run_step(context, "branch_a" , run_id, start, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_branch_b ( context : OpExecutionContext, start : str ) -> str :
run_id = start.split( "/" )[ 0 ]
task_path = _run_step(context, "branch_b" , run_id, start, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
# Join op (receives multiple inputs)
@op ( ins = { "branch_a" : In( str ), "branch_b" : In( str )}, out = Out( str ))
def op_join ( context : OpExecutionContext, branch_a , branch_b ) -> str :
run_id = branch_a.split( "/" )[ 0 ]
input_paths = compress_list([branch_a, branch_b])
task_path = _run_step(context, "join" , run_id, input_paths, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@job
def BranchingFlow ():
start_outputs = op_start()
a = op_branch_a(start_outputs.branch_a)
b = op_branch_b(start_outputs.branch_b)
op_end(op_join( branch_a = a, branch_b = b))
Branches run in parallel in Dagster. The join step waits for all branches to complete, then receives a compressed list of input paths via --input-paths.
Foreach (Fan-out)
Dynamic parallelism: a step creates a runtime-determined list, then fans out to process each item in parallel.
from metaflow import FlowSpec, step
class ForeachFlow ( FlowSpec ):
"""A foreach flow that maps over a list of items."""
@step
def start ( self ):
self .items = [ "apple" , "banana" , "cherry" ]
self .next( self .process_item, foreach = "items" )
@step
def process_item ( self ):
self .processed = self .input.upper()
self .next( self .join)
@step
def join ( self , inputs ):
self .results = [i.processed for i in inputs]
self .next( self .end)
@step
def end ( self ):
assert sorted ( self .results) == [ "APPLE" , "BANANA" , "CHERRY" ]
print ( "ForeachFlow completed:" , self .results)
Generated Dagster Graph
The foreach step uses DynamicOut to yield one output per item. The body step is mapped over all items, and the join collects results:
# Foreach op (dynamic fan-out)
@op ( out = DynamicOut( str ))
def op_start ( context : OpExecutionContext):
run_id = _make_run_id(context.run_id)
params_path = _run_init(context, run_id, "params" , {})
task_path = _run_step(context, "start" , run_id, params_path, "1" , ... )
_add_step_metadata(context, task_path)
num_splits = _get_foreach_splits(run_id, "start" , "1" )
for _i in range (num_splits):
yield DynamicOutput( f " { task_path } //_i= { _i } " , mapping_key = str (_i))
# Body op (receives encoded split index)
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_process_item ( context : OpExecutionContext, start : str ) -> str :
# start = "run_id/start/1//_i=N"
parts = start.split( "//_i=" )
base_path, split_index = parts[ 0 ], int (parts[ 1 ]) if len (parts) > 1 else 0
run_id = base_path.split( "/" )[ 0 ]
task_id = f "1- { split_index } "
task_path = _run_step(
context, "process_item" , run_id, base_path, task_id,
retry_count = context.retry_number,
max_user_code_retries = 0 ,
tags = [],
split_index = split_index,
extra_env = None ,
)
_add_step_metadata(context, task_path)
return task_path
# Join op (collects dynamic results)
@op ( ins = { "foreach_results" : In( list )}, out = Out( str ))
def op_join ( context : OpExecutionContext, foreach_results : list ) -> str :
run_id = foreach_results[ 0 ].split( "/" )[ 0 ]
clean = [p.split( "//_i=" )[ 0 ] for p in foreach_results]
input_paths = compress_list(clean)
task_path = _run_step(context, "join" , run_id, input_paths, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@job
def ForeachFlow ():
start_items = op_start()
processed = start_items.map(op_process_item)
op_end(op_join(processed.collect()))
The compiler reads _foreach_num_splits from the datastore after the foreach step completes to determine how many dynamic outputs to emit. Each body task gets --split-index N and a unique task ID like 1-0, 1-1, 1-2.
Conditional (Split-Switch)
Runtime branching: the flow decides at runtime which branch to take. Only one branch executes.
from metaflow import FlowSpec, Parameter, step
class ConditionalFlow ( FlowSpec ):
"""A conditional flow that takes different branches based on a parameter."""
value = Parameter( "value" , default = 42 , type = int )
@step
def start ( self ):
self .route = "high" if self .value >= 50 else "low"
self .next({ "high" : self .high_branch, "low" : self .low_branch}, condition = "route" )
@step
def high_branch ( self ):
self .result = "HIGH: %d " % self .value
self .next( self .join)
@step
def low_branch ( self ):
self .result = "LOW: %d " % self .value
self .next( self .join)
@step
def join ( self ):
self .final = self .result
self .next( self .end)
@step
def end ( self ):
print ( "Result:" , self .final)
Generated Dagster Graph
The split-switch op yields an optional output for each branch. The merge step receives all branches as optional inputs:
# Split-switch op (conditional start)
@op ( out = { "high_branch" : Out( str , is_required = False ), "low_branch" : Out( str , is_required = False )})
def op_start ( context : OpExecutionContext, config : ConditionalFlowConfig):
run_id = _make_run_id(context.run_id)
parameters = { "value" : config.value}
params_path = _run_init(context, run_id, "params" , parameters)
task_path = _run_step(context, "start" , run_id, params_path, "1" , ... )
_add_step_metadata(context, task_path)
_branch = _get_condition_branch(run_id, "start" , "1" )
yield Output(task_path, output_name = _branch)
# Branch ops (optional execution)
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_high_branch ( context : OpExecutionContext, start : str ) -> str :
run_id = start.split( "/" )[ 0 ]
task_path = _run_step(context, "high_branch" , run_id, start, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@op ( ins = { "start" : In( str )}, out = Out( str ))
def op_low_branch ( context : OpExecutionContext, start : str ) -> str :
run_id = start.split( "/" )[ 0 ]
task_path = _run_step(context, "low_branch" , run_id, start, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
# Merge op (receives optional inputs)
@op (
ins = {
"high_branch" : In(Optional[ str ], default_value = None ),
"low_branch" : In(Optional[ str ], default_value = None )
},
out = Out( str )
)
def op_join ( context : OpExecutionContext, high_branch , low_branch ) -> str :
active_path = next (p for p in [high_branch, low_branch] if p is not None )
run_id = active_path.split( "/" )[ 0 ]
task_path = _run_step(context, "join" , run_id, active_path, "1" , ... )
_add_step_metadata(context, task_path)
return task_path
@job
def ConditionalFlow ():
start_outputs = op_start()
h = op_high_branch(start_outputs.high_branch)
l = op_low_branch(start_outputs.low_branch)
op_end(op_join( high_branch = h, low_branch = l))
The compiler reads the _transition artifact from the datastore to determine which branch was taken. Only the active branch op receives data and executes; the other branch op is skipped.
Nested Foreach
You can nest foreach steps (e.g., outer loop over datasets, inner loop over hyperparameters). The compiler generates a compound op that encapsulates the inner foreach chain.
See the source at tests/flows/nested_foreach_flow.py for a full example.
Summary Table
Metaflow Pattern Graph Type Dagster Translation Linear (self.next(self.a)) linearSingle input, single output op Split/Join (self.next(self.a, self.b)) split / joinMultiple named outputs → multiple inputs Foreach (self.next(self.a, foreach="items")) foreachDynamicOut → .map() → .collect()Conditional (self.next({...}, condition="key")) split-switchOptional outputs → optional inputs
Next Steps
How It Works Understand the compilation and execution process
Compilation Details Explore what happens during dagster create