Skip to main content

Overview

Metaflow supports several DAG patterns: linear chains, static branches (split/join), dynamic fan-out (foreach), and runtime conditionals (split-switch). metaflow-dagster translates each pattern into the corresponding Dagster graph structure. All graph shapes are fully supported — the compiler handles the wiring automatically.

Linear

The simplest pattern: each step calls self.next() with a single successor.

Metaflow Code

linear_flow.py
from metaflow import FlowSpec, step

class LinearFlow(FlowSpec):
    """A simple linear flow for testing the Dagster integration."""

    @step
    def start(self):
        self.message = "hello from start"
        self.next(self.process)

    @step
    def process(self):
        self.result = self.message + " -> process"
        self.next(self.end)

    @step
    def end(self):
        assert self.result == "hello from start -> process"
        print("LinearFlow completed:", self.result)

Generated Dagster Graph

Each step becomes a single @op with one input and one output:
@op(out=Out(str))
def op_start(context: OpExecutionContext) -> str:
    run_id = _make_run_id(context.run_id)
    params_path = _run_init(context, run_id, "params", {})
    task_path = _run_step(context, "start", run_id, params_path, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"start": In(str)}, out=Out(str))
def op_process(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "process", run_id, start, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"process": In(str)})
def op_end(context: OpExecutionContext, process: str) -> None:
    run_id = process.split("/")[0]
    _run_step(context, "end", run_id, process, "1", ...)

@job
def LinearFlow():
    op_end(op_process(op_start()))
Each op receives the upstream task pathspec (e.g., "dagster-abc123/start/1") and passes it to the Metaflow step CLI via --input-paths. Artifacts flow through the datastore, not through Dagster’s output system.

Branching (Split/Join)

Static branches: the start (or a mid-flow split step) fans out to multiple branches that run in parallel, then converge at a join step.

Metaflow Code

branching_flow.py
from metaflow import FlowSpec, step

class BranchingFlow(FlowSpec):
    """A branching flow that splits and rejoins."""

    @step
    def start(self):
        self.value = 10
        self.next(self.branch_a, self.branch_b)

    @step
    def branch_a(self):
        self.a_result = self.value * 2
        self.next(self.join)

    @step
    def branch_b(self):
        self.b_result = self.value + 5
        self.next(self.join)

    @step
    def join(self, inputs):
        self.merged_a = inputs.branch_a.a_result
        self.merged_b = inputs.branch_b.b_result
        self.next(self.end)

    @step
    def end(self):
        assert self.merged_a == 20
        assert self.merged_b == 15
        print("BranchingFlow completed: a=%d b=%d" % (self.merged_a, self.merged_b))

Generated Dagster Graph

The split step yields multiple named outputs. The join op receives all branch outputs as separate inputs:
# Split op (start step with multiple branches)
@op(out={"branch_a": Out(str), "branch_b": Out(str)})
def op_start(context: OpExecutionContext):
    run_id = _make_run_id(context.run_id)
    params_path = _run_init(context, run_id, "params", {})
    task_path = _run_step(context, "start", run_id, params_path, "1", ...)
    _add_step_metadata(context, task_path, output_name="branch_a")
    yield Output(task_path, output_name="branch_a")
    yield Output(task_path, output_name="branch_b")

# Branch ops (linear steps)
@op(ins={"start": In(str)}, out=Out(str))
def op_branch_a(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "branch_a", run_id, start, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"start": In(str)}, out=Out(str))
def op_branch_b(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "branch_b", run_id, start, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

# Join op (receives multiple inputs)
@op(ins={"branch_a": In(str), "branch_b": In(str)}, out=Out(str))
def op_join(context: OpExecutionContext, branch_a, branch_b) -> str:
    run_id = branch_a.split("/")[0]
    input_paths = compress_list([branch_a, branch_b])
    task_path = _run_step(context, "join", run_id, input_paths, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@job
def BranchingFlow():
    start_outputs = op_start()
    a = op_branch_a(start_outputs.branch_a)
    b = op_branch_b(start_outputs.branch_b)
    op_end(op_join(branch_a=a, branch_b=b))
Branches run in parallel in Dagster. The join step waits for all branches to complete, then receives a compressed list of input paths via --input-paths.

Foreach (Fan-out)

Dynamic parallelism: a step creates a runtime-determined list, then fans out to process each item in parallel.

Metaflow Code

foreach_flow.py
from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):
    """A foreach flow that maps over a list of items."""

    @step
    def start(self):
        self.items = ["apple", "banana", "cherry"]
        self.next(self.process_item, foreach="items")

    @step
    def process_item(self):
        self.processed = self.input.upper()
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [i.processed for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        assert sorted(self.results) == ["APPLE", "BANANA", "CHERRY"]
        print("ForeachFlow completed:", self.results)

Generated Dagster Graph

The foreach step uses DynamicOut to yield one output per item. The body step is mapped over all items, and the join collects results:
# Foreach op (dynamic fan-out)
@op(out=DynamicOut(str))
def op_start(context: OpExecutionContext):
    run_id = _make_run_id(context.run_id)
    params_path = _run_init(context, run_id, "params", {})
    task_path = _run_step(context, "start", run_id, params_path, "1", ...)
    _add_step_metadata(context, task_path)
    num_splits = _get_foreach_splits(run_id, "start", "1")
    for _i in range(num_splits):
        yield DynamicOutput(f"{task_path}//_i={_i}", mapping_key=str(_i))

# Body op (receives encoded split index)
@op(ins={"start": In(str)}, out=Out(str))
def op_process_item(context: OpExecutionContext, start: str) -> str:
    # start = "run_id/start/1//_i=N"
    parts = start.split("//_i=")
    base_path, split_index = parts[0], int(parts[1]) if len(parts) > 1 else 0
    run_id = base_path.split("/")[0]
    task_id = f"1-{split_index}"
    task_path = _run_step(
        context, "process_item", run_id, base_path, task_id,
        retry_count=context.retry_number,
        max_user_code_retries=0,
        tags=[],
        split_index=split_index,
        extra_env=None,
    )
    _add_step_metadata(context, task_path)
    return task_path

# Join op (collects dynamic results)
@op(ins={"foreach_results": In(list)}, out=Out(str))
def op_join(context: OpExecutionContext, foreach_results: list) -> str:
    run_id = foreach_results[0].split("/")[0]
    clean = [p.split("//_i=")[0] for p in foreach_results]
    input_paths = compress_list(clean)
    task_path = _run_step(context, "join", run_id, input_paths, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@job
def ForeachFlow():
    start_items = op_start()
    processed = start_items.map(op_process_item)
    op_end(op_join(processed.collect()))
The compiler reads _foreach_num_splits from the datastore after the foreach step completes to determine how many dynamic outputs to emit. Each body task gets --split-index N and a unique task ID like 1-0, 1-1, 1-2.

Conditional (Split-Switch)

Runtime branching: the flow decides at runtime which branch to take. Only one branch executes.

Metaflow Code

conditional_flow.py
from metaflow import FlowSpec, Parameter, step

class ConditionalFlow(FlowSpec):
    """A conditional flow that takes different branches based on a parameter."""

    value = Parameter("value", default=42, type=int)

    @step
    def start(self):
        self.route = "high" if self.value >= 50 else "low"
        self.next({"high": self.high_branch, "low": self.low_branch}, condition="route")

    @step
    def high_branch(self):
        self.result = "HIGH: %d" % self.value
        self.next(self.join)

    @step
    def low_branch(self):
        self.result = "LOW: %d" % self.value
        self.next(self.join)

    @step
    def join(self):
        self.final = self.result
        self.next(self.end)

    @step
    def end(self):
        print("Result:", self.final)

Generated Dagster Graph

The split-switch op yields an optional output for each branch. The merge step receives all branches as optional inputs:
# Split-switch op (conditional start)
@op(out={"high_branch": Out(str, is_required=False), "low_branch": Out(str, is_required=False)})
def op_start(context: OpExecutionContext, config: ConditionalFlowConfig):
    run_id = _make_run_id(context.run_id)
    parameters = {"value": config.value}
    params_path = _run_init(context, run_id, "params", parameters)
    task_path = _run_step(context, "start", run_id, params_path, "1", ...)
    _add_step_metadata(context, task_path)
    _branch = _get_condition_branch(run_id, "start", "1")
    yield Output(task_path, output_name=_branch)

# Branch ops (optional execution)
@op(ins={"start": In(str)}, out=Out(str))
def op_high_branch(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "high_branch", run_id, start, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"start": In(str)}, out=Out(str))
def op_low_branch(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "low_branch", run_id, start, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

# Merge op (receives optional inputs)
@op(
    ins={
        "high_branch": In(Optional[str], default_value=None),
        "low_branch": In(Optional[str], default_value=None)
    },
    out=Out(str)
)
def op_join(context: OpExecutionContext, high_branch, low_branch) -> str:
    active_path = next(p for p in [high_branch, low_branch] if p is not None)
    run_id = active_path.split("/")[0]
    task_path = _run_step(context, "join", run_id, active_path, "1", ...)
    _add_step_metadata(context, task_path)
    return task_path

@job
def ConditionalFlow():
    start_outputs = op_start()
    h = op_high_branch(start_outputs.high_branch)
    l = op_low_branch(start_outputs.low_branch)
    op_end(op_join(high_branch=h, low_branch=l))
The compiler reads the _transition artifact from the datastore to determine which branch was taken. Only the active branch op receives data and executes; the other branch op is skipped.

Nested Foreach

You can nest foreach steps (e.g., outer loop over datasets, inner loop over hyperparameters). The compiler generates a compound op that encapsulates the inner foreach chain. See the source at tests/flows/nested_foreach_flow.py for a full example.

Summary Table

Metaflow PatternGraph TypeDagster Translation
Linear (self.next(self.a))linearSingle input, single output op
Split/Join (self.next(self.a, self.b))split / joinMultiple named outputs → multiple inputs
Foreach (self.next(self.a, foreach="items"))foreachDynamicOut.map().collect()
Conditional (self.next({...}, condition="key"))split-switchOptional outputs → optional inputs

Next Steps

How It Works

Understand the compilation and execution process

Compilation Details

Explore what happens during dagster create