Skip to main content

Overview

The dagster trigger command launches a new execution of a Dagster job compiled from your Metaflow flow. This command provides a programmatic way to start runs without using the Dagster UI.

Command Syntax

python my_flow.py dagster trigger [OPTIONS]

Options

--definitions-file
string
Path to the generated Dagster definitions file. Defaults to <flowname>_dagster.py (lowercased).
--job-name
string
Dagster job name to execute. Defaults to the flow name. Must match the job name used during dagster create.
--run-param
string
Flow parameter as key=value. Can be specified multiple times to pass multiple parameters. Parameters are passed as op-level config to the start op.

Examples

Basic Usage

Trigger a job with default parameters:
python my_flow.py dagster trigger
# Uses: myflow_dagster.py, job name: MyFlow

Specify Definitions File

Trigger a job from a custom definitions file:
python my_flow.py dagster trigger --definitions-file dagster_defs.py

Pass Flow Parameters

Provide parameter values for parametrized flows:
python param_flow.py dagster trigger --run-param greeting=Hello

Custom Job Name

Trigger a job with a custom name:
python my_flow.py dagster trigger \
  --definitions-file production_dagster.py \
  --job-name production_pipeline

Complete Example

Trigger a parametrized flow with all options:
python train_flow.py dagster trigger \
  --definitions-file train_flow_prod.py \
  --job-name production_training \
  --run-param learning_rate=0.001 \
  --run-param epochs=100 \
  --run-param batch_size=32

Passing Parameters

For flows with metaflow.Parameter definitions, use --run-param to provide values:
# Flow definition
from metaflow import FlowSpec, Parameter, step

class ParamFlow(FlowSpec):
    greeting = Parameter("greeting", default="Hello")
    count = Parameter("count", default=3, type=int)
    
    @step
    def start(self):
        print(f"{self.greeting} x{self.count}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
# Trigger with parameters
python param_flow.py dagster trigger \
  --run-param greeting=Hi \
  --run-param count=5

Output

On successful execution, the command displays:
Dagster job MyFlow executed from myflow_dagster.py.
If the job fails, you’ll see:
MetaflowException: Dagster job 'MyFlow' failed. <error details>

Run Configuration

Parameters are passed to Dagster as op-level config:
# Equivalent to:
ops:
  op_start:
    config:
      greeting: Hi
      count: 5
This configuration is automatically constructed from your --run-param arguments and passed to the start op.

Dagster Home Directory

The command automatically manages the DAGSTER_HOME environment variable:
  • If DAGSTER_HOME is already set and exists, it uses that configuration
  • Otherwise, creates a temporary directory with:
    • SQLite storage (single-process, no daemon required)
    • SyncInMemoryRunLauncher (executes runs synchronously)
    • DefaultRunCoordinator (no queue daemon required)

Metaflow Run ID

The Metaflow run ID is derived deterministically from the Dagster run UUID:
run_id = "dagster-" + hashlib.sha1(dagster_run_id.encode()).hexdigest()[:12]
# Example: dagster-d75a08c398a3
This ensures consistent run identification across Dagster and Metaflow.

Next Steps

Create Definitions

Compile a flow to a Dagster definitions file

Resume Failed Runs

Resume a failed run and skip completed steps

See Also