Skip to main content

Overview

When you run python my_flow.py dagster create dagster_defs.py, the compiler analyzes your flow and emits a complete, self-contained Dagster definitions file. This file includes all the plumbing needed to run your flow through Dagster.

Command Syntax

python my_flow.py dagster create <output_file> [options]

Common Options

OptionDescription
--name <job_name>Override the Dagster job name (defaults to flow class name)
--with <decorator>Inject step decorators at deploy time (e.g., --with=sandbox)
--tag <key:value>Attach Metaflow tags forwarded to every step subprocess
--workflow-timeout <seconds>Cap the total wall-clock time for the job
--namespace <name>Set the Metaflow namespace (for flow isolation)

Example

python train_flow.py dagster create train_dagster.py \
  --name nightly_training \
  --with=sandbox \
  --tag env:prod \
  --tag version:2 \
  --workflow-timeout 7200
The generated file is deterministic — running the same command twice produces identical output (modulo timestamp comments).

Generated File Structure

From the compiler source (dagster_compiler.py:10-19), the file contains nine sections:

1. Preamble + Imports

A header comment identifies the source flow and regeneration command:
"""
Dagster definitions generated by metaflow-dagster.
Flow   : TrainFlow
Job    : nightly_training
Source : /home/user/train_flow.py

DO NOT EDIT — re-generate with:
    python train_flow.py dagster create train_dagster.py
"""
import atexit
import hashlib
import json
import os
import subprocess
import sys
from typing import Dict, List, Optional

from dagster import (
    Config,
    DefaultSensorStatus,
    Definitions,
    DynamicOut,
    DynamicOutput,
    In,
    MetadataValue,
    OpExecutionContext,
    Out,
    Output,
    RetryPolicy,
    RunRequest,
    ScheduleDefinition,
    SensorDefinition,
    SensorEvaluationContext,
    job,
    op,
    sensor,
)
from metaflow.decorators import extract_step_decorator_from_decospec
from metaflow.util import compress_list

2. Constants

Metadata and datastore settings are baked in at compile time (from dagster_compiler.py:76-98):
# ── Flow constants ─────────────────────────────────────────────────────────────
FLOW_FILE = '/home/user/train_flow.py'
FLOW_NAME = 'TrainFlow'

# When set, _run_step passes --clone-run-id to Metaflow so completed tasks are
# reused and only failed/pending steps are re-executed (resume support).
ORIGIN_RUN_ID: Optional[str] = None

# Metaflow CLI top-level flags embedded at compile time
METAFLOW_TOP_ARGS: List[str] = [
    '--quiet',
    '--no-pylint',
    '--metadata=service',
    '--environment=conda',
    '--datastore=s3',
    '--datastore-root=s3://my-bucket/metaflow',
    '--event-logger=nullSidecarLogger',
    '--monitor=nullSidecarMonitor',
    '--namespace=prod',
]

# Metaflow env-vars forwarded to every subprocess
METAFLOW_STEP_ENV: Dict[str, str] = {
    'METAFLOW_HOME': '/home/user/.metaflow',
    'METAFLOW_DATASTORE_SYSROOT_S3': 's3://my-bucket/metaflow',
}

# Per-step runtime decorator specs and retry limits used to honor runtime_step_cli hooks.
STEP_DECORATOR_SPECS: Dict[str, List[str]] = {
    'start': ['retry:times=3'],
    'train': ['timeout:seconds=300', 'resources:cpu=4,memory=8000'],
    'end': [],
}

STEP_WITH_DECORATORS: Dict[str, List[str]] = {
    'start': ['sandbox', 'retry:times=3'],
    'train': ['sandbox', 'timeout:seconds=300', 'resources:cpu=4,memory=8000'],
    'end': ['sandbox'],
}

# Per-step conda environment IDs, embedded at compile time.
STEP_CONDA_ENV_IDS: Dict[str, Optional[str]] = {
    'start': 'a1b2c3d4e5f6',
    'train': 'a1b2c3d4e5f6',
    'end': None,
}
These constants ensure every step subprocess uses the same backend configuration as the compile-time environment, preventing accidental metadata or datastore mismatches.

3. Helpers

From the preamble template (dagster_compiler.py:100-561):
def _make_run_id(dagster_run_id: str) -> str:
    """Derive a short, deterministic Metaflow run-id from Dagster's UUID."""
    return "dagster-" + hashlib.sha1(dagster_run_id.encode()).hexdigest()[:12]

def _run_init(context, run_id, params_task_id, parameters) -> str:
    """Create the _parameters task. Returns its full pathspec."""
    cmd = [
        sys.executable, FLOW_FILE,
        *METAFLOW_TOP_ARGS,
        "init", "--run-id", run_id, "--task-id", params_task_id
    ]
    if ORIGIN_RUN_ID:
        cmd += ["--clone-run-id", ORIGIN_RUN_ID]
    extra = {
        "METAFLOW_INIT_" + k.upper().replace("-", "_"): str(v)
        for k, v in (parameters or {}).items()
    }
    _run_cmd(context, cmd, extra or None)
    return f"{run_id}/_parameters/{params_task_id}"

def _run_step(context, step_name, run_id, input_paths, task_id,
              retry_count=0, max_user_code_retries=0, tags=None,
              split_index=None, extra_env=None) -> str:
    """Execute one Metaflow step. Returns its full task pathspec."""
    # Builds the CLI command with decorators, conda interpreter, etc.
    # (See dagster_compiler.py:356 for full implementation)
    ...

def _get_foreach_splits(run_id, step_name, task_id) -> int:
    """Read the foreach cardinality directly from the local datastore files."""
    # Reads _foreach_num_splits from 0.data.json
    ...

def _get_condition_branch(run_id, step_name, task_id) -> str:
    """Read the condition branch taken from the local datastore files."""
    # Reads _transition artifact
    ...

def _add_step_metadata(context, task_path, output_name=None) -> None:
    """Emit artifact-key metadata to the Dagster UI."""
    # Reads 0.data.json, emits retrieval snippet
    ...

4. Config Class (if flow has Parameters)

From dagster_compiler.py:747-771:
class TrainFlowConfig(Config):
    learning_rate: float = 0.001  # Learning rate for training
    batch_size: int = 32  # Batch size
    epochs: int = 10
The compiler extracts parameter defaults via deploy_time_eval() and infers types (bool, int, float, str).
If your flow has no Parameter definitions, this section is omitted and ops don’t take a config argument.

5. Ops (one per step)

From dagster_compiler.py:843-1203, each step generates an @op:
# Start op (handles init + step)
@op(out=Out(str))
def op_start(context: OpExecutionContext, config: TrainFlowConfig) -> str:
    run_id = _make_run_id(context.run_id)
    parameters = {"learning_rate": config.learning_rate, "batch_size": config.batch_size, "epochs": config.epochs}
    params_path = _run_init(context, run_id, "params", parameters)
    task_path = _run_step(
        context, "start", run_id, params_path, "1",
        retry_count=context.retry_number,
        max_user_code_retries=3,
        tags=["env:prod", "version:2"],
        extra_env=None,
    )
    _add_step_metadata(context, task_path)
    return task_path

# Linear op with retry policy and timeout
@op(
    ins={"start": In(str)},
    out=Out(str),
    retry_policy=RetryPolicy(max_retries=3),
    tags={"dagster/op_execution_timeout": "300"}
)
def op_train(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "train", run_id, start, "1",
        retry_count=context.retry_number, max_user_code_retries=0,
        tags=["env:prod", "version:2"], extra_env=None)
    _add_step_metadata(context, task_path)
    return task_path

# End op
@op(ins={"train": In(str)})
def op_end(context: OpExecutionContext, train: str) -> None:
    run_id = train.split("/")[0]
    _run_step(context, "end", run_id, train, "1",
        retry_count=context.retry_number, max_user_code_retries=0,
        tags=["env:prod", "version:2"], extra_env=None)

6. Job Definition

The @job function wires together all ops in topological order:
@job
def nightly_training():
    op_end(op_train(op_start()))
For more complex graphs (splits, foreachs), the job body includes fan-out/fan-in logic:
@job
def BranchingFlow():
    start_outputs = op_start()
    a = op_branch_a(start_outputs.branch_a)
    b = op_branch_b(start_outputs.branch_b)
    op_end(op_join(branch_a=a, branch_b=b))

7. Schedule (if @schedule decorator present)

If your flow has a @schedule decorator, the compiler emits a ScheduleDefinition:
nightly_training_schedule = ScheduleDefinition(
    job=nightly_training,
    cron_schedule="0 0 * * *",  # From @schedule(daily=True) or cron expression
)
Metaflow’s @schedule decorator supports daily, hourly, weekly, or raw cron strings. The compiler translates these to Dagster’s cron format.

8. Sensors (from @trigger / @trigger_on_finish)

If your flow has event-driven decorators:
@sensor(job=DownstreamFlow, default_status=DefaultSensorStatus.RUNNING)
def DownstreamFlow_on_finish_0(context: SensorEvaluationContext):
    """
    Fire DownstreamFlow when UpstreamFlow completes successfully.
    Auto-generated from @trigger_on_finish(flow="UpstreamFlow").
    """
    # Polls Dagster run history for UpstreamFlow successes
    ...
For @trigger(event=...), the compiler emits a stub with a TODO comment:
@sensor(job=MyFlow, default_status=DefaultSensorStatus.STOPPED)
def MyFlow_event_0(context: SensorEvaluationContext):
    """
    TODO: Wire this sensor to your event source (webhook, queue, etc.).
    Auto-generated from @trigger(event="data.ready").
    """
    # yield RunRequest(...)
    pass

9. Definitions

The final Definitions object bundles everything:
defs = Definitions(
    jobs=[nightly_training],
    schedules=[nightly_training_schedule],
    sensors=[DownstreamFlow_on_finish_0],
)
This is what Dagster loads when you run dagster dev -f train_dagster.py.

Metadata and Datastore Baking

The compiler embeds your Metaflow backend configuration at compile time (from dagster_compiler.py:659-676):
def _build_top_args(self) -> list[str]:
    top_opts_dict: dict = {}
    for deco in flow_decorators(self.flow):
        top_opts_dict.update(deco.get_top_level_options())
    top_opts = list(dict_to_cli_options(top_opts_dict))
    args = top_opts + [
        "--quiet",
        "--no-pylint",
        f"--metadata={self.metadata.TYPE}",
        f"--environment={self.environment.TYPE}",
        f"--datastore={self.flow_datastore.TYPE}",
        f"--datastore-root={self.flow_datastore.datastore_root}",
        f"--event-logger={self.event_logger.TYPE}",
        f"--monitor={self.monitor.TYPE}",
    ]
    if self.namespace:
        args.append(f"--namespace={self.namespace}")
    return args
This ensures every metaflow step subprocess uses the same backend as the deployment environment.

Example: Using Remote S3 Datastore

Before running dagster create, configure Metaflow:
export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-bucket/metaflow

python my_flow.py dagster create my_flow_dagster.py
The generated file will include:
METAFLOW_TOP_ARGS: List[str] = [
    '--metadata=service',
    '--datastore=s3',
    '--datastore-root=s3://my-bucket/metaflow',
    ...
]
If you switch datastores after generating the file, your Dagster runs will write to the old datastore embedded in the constants. Re-run dagster create to update.

Decorator Forwarding

From dagster_compiler.py:678-697, the compiler extracts decorator information for each step:

STEP_DECORATOR_SPECS

Full decorator specs from the flow source (e.g., retry:times=3):
STEP_DECORATOR_SPECS: Dict[str, List[str]] = {
    'train': ['timeout:seconds=300', 'resources:cpu=4,memory=8000'],
}
These are used by _run_step() to invoke runtime_step_cli() hooks (for decorators like @sandbox or @conda).

STEP_WITH_DECORATORS

Decorators injected at deploy time via --with=<decorator>, plus @resources hints:
STEP_WITH_DECORATORS: Dict[str, List[str]] = {
    'train': ['sandbox', 'resources:cpu=4,memory=8000'],
}
These are appended to the metaflow step command as --with=... flags.

Resume Support

If you generate a file with --origin-run-id (used by dagster resume), the ORIGIN_RUN_ID constant is set:
ORIGIN_RUN_ID: Optional[str] = 'dagster-abc123def456'
Every _run_init and _run_step call then passes --clone-run-id <ORIGIN_RUN_ID> to Metaflow, which reuses completed tasks and re-executes only failed/pending steps.

Generated Code Example

Here’s a minimal generated file for LinearFlow:
"""
Dagster definitions generated by metaflow-dagster.
Flow   : LinearFlow
Job    : LinearFlow
Source : /home/user/linear_flow.py

DO NOT EDIT — re-generate with:
    python linear_flow.py dagster create linear_flow_dagster.py
"""
import subprocess, os, json, sys, hashlib
from typing import Dict, List, Optional
from dagster import (
    OpExecutionContext, In, Out, Output, job, op, Definitions, MetadataValue
)
from metaflow.util import compress_list

FLOW_FILE = '/home/user/linear_flow.py'
FLOW_NAME = 'LinearFlow'
ORIGIN_RUN_ID: Optional[str] = None
METAFLOW_TOP_ARGS: List[str] = ['--quiet', '--no-pylint', '--metadata=local', '--datastore=local', '--datastore-root=/home/user/.metaflow', '--environment=local', '--event-logger=nullSidecarLogger', '--monitor=nullSidecarMonitor']
METAFLOW_STEP_ENV: Dict[str, str] = {}
STEP_DECORATOR_SPECS: Dict[str, List[str]] = {'start': [], 'process': [], 'end': []}
STEP_WITH_DECORATORS: Dict[str, List[str]] = {'start': [], 'process': [], 'end': []}
STEP_CONDA_ENV_IDS: Dict[str, Optional[str]] = {'start': None, 'process': None, 'end': None}

def _make_run_id(dagster_run_id: str) -> str:
    return "dagster-" + hashlib.sha1(dagster_run_id.encode()).hexdigest()[:12]

def _build_env(extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
    e = {**os.environ, **METAFLOW_STEP_ENV}
    if extra:
        e.update(extra)
    return e

def _run_cmd(context, cmd, extra_env=None):
    context.log.info("$ " + " ".join(str(c) for c in cmd))
    proc = subprocess.Popen(cmd, env=_build_env(extra_env), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=os.path.dirname(FLOW_FILE) or ".")
    stdout, stderr = proc.communicate()
    if stdout.strip():
        context.log.info(stdout)
    if stderr.strip():
        context.log.info(stderr)
    if proc.returncode != 0:
        raise RuntimeError(f"Command failed (exit {proc.returncode}):\n{stderr[-2000:]}")

def _run_init(context, run_id, params_task_id, parameters=None) -> str:
    cmd = [sys.executable, FLOW_FILE] + METAFLOW_TOP_ARGS + ["init", "--run-id", run_id, "--task-id", params_task_id]
    if ORIGIN_RUN_ID:
        cmd += ["--clone-run-id", ORIGIN_RUN_ID]
    extra = {"METAFLOW_INIT_" + k.upper().replace("-", "_"): str(v) for k, v in (parameters or {}).items()}
    _run_cmd(context, cmd, extra or None)
    return f"{run_id}/_parameters/{params_task_id}"

def _run_step(context, step_name, run_id, input_paths, task_id, retry_count=0, max_user_code_retries=0, tags=None, split_index=None, extra_env=None) -> str:
    cmd = [sys.executable, FLOW_FILE] + METAFLOW_TOP_ARGS + [f"--with={d}" for d in STEP_WITH_DECORATORS.get(step_name, [])] + ["step", step_name, "--run-id", run_id, "--task-id", task_id, "--input-paths", input_paths, "--retry-count", str(retry_count), "--max-user-code-retries", str(max_user_code_retries)]
    if tags:
        for t in tags:
            cmd += ["--tag", t]
    if split_index is not None:
        cmd += ["--split-index", str(split_index)]
    if ORIGIN_RUN_ID:
        cmd += ["--clone-run-id", ORIGIN_RUN_ID]
    _run_cmd(context, cmd, extra_env)
    return f"{run_id}/{step_name}/{task_id}"

def _get_ds_root() -> str:
    return next((a.split("--datastore-root=", 1)[1] for a in METAFLOW_TOP_ARGS if a.startswith("--datastore-root=")), os.environ.get("METAFLOW_DATASTORE_SYSROOT_LOCAL", ""))

def _add_step_metadata(context, task_path, output_name=None):
    parts = task_path.split("/")
    if len(parts) < 3:
        return
    run_id, step_name, task_id = parts[0], parts[1], parts[2]
    data_json_path = os.path.join(_get_ds_root(), FLOW_NAME, run_id, step_name, task_id, "0.data.json")
    try:
        with open(data_json_path) as fh:
            objects = json.load(fh).get("objects", {})
    except Exception as _exc:
        context.log.warning(f"Could not read artifact metadata for {task_path}: {_exc}")
        return
    keys = [k for k in objects if not k.startswith("_")]
    if not keys:
        return
    pathspec = "/".join([FLOW_NAME, run_id, step_name, task_id])
    lines = ["from metaflow import Task", "task = Task(" + repr(pathspec) + ")"]
    lines += ["task[" + repr(k) + "].data  # " + k for k in keys]
    snippet = "\n".join(lines)
    meta = {"artifact_keys": MetadataValue.md("**Metaflow artifacts:** " + ", ".join("`" + k + "`" for k in keys)), "retrieve (copy ↓)": MetadataValue.text(snippet)}
    kwargs = {"output_name": output_name} if output_name is not None else {}
    try:
        context.add_output_metadata(meta, **kwargs)
    except Exception as _exc:
        context.log.warning(f"Could not attach output metadata for {task_path}: {_exc}")

@op(out=Out(str))
def op_start(context: OpExecutionContext) -> str:
    run_id = _make_run_id(context.run_id)
    parameters = {}
    params_path = _run_init(context, run_id, "params", parameters)
    task_path = _run_step(context, "start", run_id, params_path, "1", retry_count=context.retry_number, max_user_code_retries=0, tags=[], extra_env=None)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"start": In(str)}, out=Out(str))
def op_process(context: OpExecutionContext, start: str) -> str:
    run_id = start.split("/")[0]
    task_path = _run_step(context, "process", run_id, start, "1", retry_count=context.retry_number, max_user_code_retries=0, tags=[], extra_env=None)
    _add_step_metadata(context, task_path)
    return task_path

@op(ins={"process": In(str)})
def op_end(context: OpExecutionContext, process: str) -> None:
    run_id = process.split("/")[0]
    _run_step(context, "end", run_id, process, "1", retry_count=context.retry_number, max_user_code_retries=0, tags=[], extra_env=None)

@job
def LinearFlow():
    op_end(op_process(op_start()))

defs = Definitions(jobs=[LinearFlow], schedules=[], sensors=[])
The generated file is standalone — you can copy it to a different machine (with the same datastore access) and run it without the original flow source.

Next Steps

How It Works

Understand the compilation and execution process

Graph Shapes

Learn how different Metaflow patterns translate to Dagster