Skip to main content

Overview

metaflow-dagster compiles your Metaflow flow’s DAG into a self-contained Dagster definitions file. The compilation process transforms each Metaflow step into a Dagster @op, preserving the execution semantics while enabling Dagster’s orchestration features.

Compilation Process

When you run python my_flow.py dagster create dagster_defs.py, the compiler:
  1. Analyzes the flow graph — Reads the Metaflow FlowSpec and traverses the DAG structure
  2. Generates Dagster code — Creates a complete Python file with ops, jobs, and helpers
  3. Embeds metadata — Bakes in datastore settings, environment configuration, and decorators
  4. Creates definitions — Produces schedules and sensors if decorators like @schedule or @trigger are present
The generated file is self-contained — it includes all the plumbing needed to run your flow through Dagster without additional configuration.

Step to Op Translation

Each Metaflow step becomes a Dagster @op that:

Runs as a subprocess

Every op executes the step via the standard Metaflow CLI:
python my_flow.py step my_step \
  --run-id dagster-abc123 \
  --task-id 1 \
  --input-paths run_id/upstream_step/1
The compiler embeds a _run_step() helper function in the generated file (see metaflow_extensions/dagster/plugins/dagster/dagster_compiler.py:356) that:
  • Constructs the CLI command with all necessary flags
  • Forwards retry counts, tags, and environment variables
  • Handles conda environments by swapping the Python interpreter
  • Streams logs through Dagster’s log panel

Passes artifacts via --input-paths

Metaflow’s artifact passing mechanism remains intact. The generated op code:
  • Receives the parent task’s pathspec (e.g., run_id/start/1)
  • Passes it to the metaflow step command via --input-paths
  • For join steps, compresses multiple paths using compress_list()
# Example: join op receives multiple branch outputs
input_paths = compress_list([branch_a_path, branch_b_path])
task_path = _run_step(
    context, "join", run_id, input_paths, "1",
    retry_count=context.retry_number,
    max_user_code_retries=0,
    tags=[],
    extra_env=None,
)
Artifacts are stored in the Metaflow datastore (local filesystem or S3/Azure). Dagster never loads them — it only passes pathspecs between ops.

Preserves decorators

Metaflow decorators like @retry, @timeout, @resources, and @environment are translated to Dagster equivalents:

@retryRetryPolicy

From dagster_compiler.py:866:
@retry(times=3, minutes_between_retries=2)
@step
def train(self):
    ...
Generates:
@op(retry_policy=RetryPolicy(max_retries=3, delay=120))
def op_train(context: OpExecutionContext):
    ...

@timeout → op tags

From dagster_compiler.py:882:
@timeout(seconds=300)
@step
def process(self):
    ...
Generates:
@op(tags={"dagster/op_execution_timeout": "300"})
def op_process(context: OpExecutionContext):
    ...

@environment → subprocess env vars

From dagster_compiler.py:894:
@environment(vars={"TOKENIZERS_PARALLELISM": "false"})
@step
def embed(self):
    ...
The generated op passes these variables to the metaflow step subprocess.

Execution Flow

Here’s what happens when you run a compiled flow:

1. Start op initializes the run

The op_start function:
run_id = _make_run_id(context.run_id)  # "dagster-abc123def456"
parameters = {"greeting": config.greeting, "count": config.count}
params_path = _run_init(context, run_id, "params", parameters)
task_path = _run_step(context, "start", run_id, params_path, "1", ...)
From dagster_compiler.py:235, _run_init() creates the _parameters task:
python my_flow.py init --run-id dagster-abc123 --task-id params
Metaflow reads parameter overrides from METAFLOW_INIT_<NAME> environment variables.

2. Downstream ops execute sequentially or in parallel

Each op:
  • Receives the upstream task pathspec
  • Runs metaflow step <name> --input-paths <upstream_pathspec>
  • Emits the new task pathspec for the next op

3. Artifact metadata is attached to Dagster outputs

After each step completes, the op calls _add_step_metadata() (from dagster_compiler.py:518), which:
  • Reads 0.data.json from the local datastore
  • Extracts artifact keys (excluding internal ones like _foreach_num_splits)
  • Emits a retrieval snippet to the Dagster UI:
from metaflow import Task
task = Task('MyFlow/dagster-abc123/start/1')
task['value'].data  # value
task['message'].data  # message
The retrieval snippet includes a copy button in the Dagster UI, making it easy to fetch artifacts from a Jupyter notebook or Python script.

Generated File Structure

From the compiler header comments (dagster_compiler.py:10-19), the generated file contains:
# 1. Preamble + imports
import subprocess, os, json
from dagster import op, job, Definitions, ...

# 2. Constants
FLOW_FILE = '/path/to/my_flow.py'
FLOW_NAME = 'MyFlow'
METAFLOW_TOP_ARGS = ['--quiet', '--metadata=local', '--datastore=local', ...]
METAFLOW_STEP_ENV = {'METAFLOW_HOME': '/home/user/.metaflow', ...}
STEP_DECORATOR_SPECS = {'start': ['retry:times=3'], ...}
STEP_WITH_DECORATORS = {'start': ['retry:times=3', 'resources:cpu=4'], ...}
STEP_CONDA_ENV_IDS = {'start': 'a1b2c3d4', 'process': None, ...}

# 3. Helpers
def _make_run_id(dagster_run_id: str) -> str: ...
def _run_init(context, run_id, params_task_id, parameters): ...
def _run_step(context, step_name, run_id, input_paths, task_id, ...): ...
def _get_foreach_splits(run_id, step_name, task_id) -> int: ...
def _get_condition_branch(run_id, step_name, task_id) -> str: ...
def _add_step_metadata(context, task_path, output_name): ...

# 4. Config (if flow has Parameters)
class MyFlowConfig(Config):
    greeting: str = "Hello"
    count: int = 5

# 5. Ops (one per step)
@op(out=Out(str))
def op_start(context: OpExecutionContext, config: MyFlowConfig) -> str:
    ...

@op(ins={"start": In(str)}, out=Out(str))
def op_process(context: OpExecutionContext, start: str) -> str:
    ...

@op(ins={"process": In(str)})
def op_end(context: OpExecutionContext, process: str) -> None:
    ...

# 6. Job
@job
def MyFlow():
    end_result = op_end(op_process(op_start()))

# 7. Schedule (optional)
MyFlow_schedule = ScheduleDefinition(
    job=MyFlow,
    cron_schedule="0 0 * * *",
)

# 8. Sensors (optional, from @trigger / @trigger_on_finish)
@sensor(...)
def MyFlow_on_finish_0(context: SensorEvaluationContext):
    ...

# 9. Definitions
defs = Definitions(
    jobs=[MyFlow],
    schedules=[MyFlow_schedule],
    sensors=[MyFlow_on_finish_0],
)

Why Subprocess Execution?

Running each step as a subprocess via metaflow step ensures:
  • Full compatibility with Metaflow’s runtime hooks (decorators like @conda, @sandbox, @batch)
  • Artifact isolation — each step writes to its own task directory in the datastore
  • Retry semantics — Metaflow’s retry counter is passed correctly via --retry-count
  • Environment consistency — the same Python interpreter and packages are used as in native Metaflow runs
The generated file never imports your flow class. It only invokes the CLI, so changes to your flow code (like adding prints or assertions) are picked up immediately without regenerating the Dagster definitions.

Next Steps

Graph Shapes

Learn how different Metaflow patterns translate to Dagster ops

Compilation Details

Explore what happens during dagster create