metaflow-dagster bakes Metaflow configuration into the generated Dagster definitions file at compile time. This ensures every step subprocess uses consistent metadata and datastore backends.
How configuration works
When you run python my_flow.py dagster create, the compiler:
Reads the current Metaflow environment configuration
Gathers relevant settings (metadata service, datastore, etc.)
Embeds them as constants in the generated file
Forwards settings to every metaflow step subprocess
This “bake-in” approach ensures deterministic behavior: the generated file captures the exact configuration at creation time.
By default, metaflow-dagster uses whatever metadata and datastore backends are active in your Metaflow environment.
Using command-line flags
Configure backends explicitly when creating the definitions file:
python my_flow.py \
--metadata=service \
--datastore=s3 \
dagster create my_flow_dagster.py
Available options:
Flag Options Description --metadatalocal, serviceMetadata backend --datastorelocal, s3, azure, gsArtifact storage
Using environment variables
Alternatively, set environment variables before compilation:
export METAFLOW_DEFAULT_METADATA = service
export METAFLOW_DEFAULT_DATASTORE = s3
export METAFLOW_SERVICE_URL = https :// metadata . example . com
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// my-bucket / metaflow
python my_flow.py dagster create my_flow_dagster.py
The compiler embeds these values in the generated file.
Configuration embedding
The generated file includes:
# Metaflow CLI top-level flags embedded at compile time
METAFLOW_TOP_ARGS : List[ str ] = [
'--quiet' ,
'--no-pylint' ,
'--metadata=service' ,
'--environment=conda' ,
'--datastore=s3' ,
'--datastore-root=s3://my-bucket/metaflow' ,
'--event-logger=nullSidecarLogger' ,
'--monitor=nullSidecarMonitor' ,
]
# Metaflow env-vars forwarded to every subprocess
METAFLOW_STEP_ENV : Dict[ str , str ] = {
'METAFLOW_DEFAULT_METADATA' : 'service' ,
'METAFLOW_DEFAULT_DATASTORE' : 's3' ,
'METAFLOW_SERVICE_URL' : 'https://metadata.example.com' ,
'METAFLOW_DATASTORE_SYSROOT_S3' : 's3://my-bucket/metaflow' ,
}
Every step subprocess inherits these settings automatically.
Once the definitions file is generated, configuration is locked in . To change backends, regenerate the file with new flags or environment variables.
Workflow timeout
Set a maximum wall-clock time for the entire job run:
python my_flow.py dagster create my_flow_dagster.py --workflow-timeout 3600
This adds a Dagster tag to the job definition:
@job ( tags = { "dagster/max_runtime" : "3600" })
def MyFlow ():
...
If the job exceeds 3600 seconds (1 hour), Dagster terminates it.
Workflow timeout is separate from per-step timeouts set with @timeout. Both can be used together.
Tag attachment
Attach Metaflow tags to all objects produced by Dagster job runs:
python my_flow.py dagster create my_flow_dagster.py \
--tag env:prod \
--tag version:2 \
--tag owner:data-team
Tags are forwarded to every metaflow step subprocess:
# In the generated file
tags = [ 'env:prod' , 'version:2' , 'owner:data-team' ]
_run_step(
context, "start" , run_id, params_path, "1" ,
tags = tags,
...
)
Tags are visible in the Metaflow UI and can be used for filtering runs and artifacts.
Custom job name
Override the default job name (which is the flow name):
python my_flow.py dagster create my_flow_dagster.py --name nightly_pipeline
The generated job is named nightly_pipeline instead of MyFlow.
Project namespace behavior
If your flow uses @project(name=...), the job name is automatically prefixed:
from metaflow import FlowSpec, step, project
@project ( name = "recommendations" )
class TrainFlow ( FlowSpec ):
@step
def start ( self ):
self .next( self .end)
@step
def end ( self ):
pass
Compiling without --name:
python train_flow.py dagster create out.py
Produces job name: recommendations_TrainFlow
You can still override with --name:
python train_flow.py dagster create out.py --name custom_job
Produces job name: custom_job
Namespace configuration
Set the Metaflow namespace for production runs:
python my_flow.py dagster create my_flow_dagster.py --namespace production
The namespace is embedded in METAFLOW_TOP_ARGS and applied to all step subprocesses:
METAFLOW_TOP_ARGS : List[ str ] = [
...
'--namespace=production' ,
]
Changing the namespace affects where Metaflow stores and retrieves run metadata. Ensure the namespace exists and has appropriate permissions.
Step decorators (—with)
Inject Metaflow step decorators at compile time without modifying the flow source:
# Run every step in a sandbox
python my_flow.py dagster create my_flow_dagster.py --with=sandbox
# Multiple decorators
python my_flow.py dagster create my_flow_dagster.py \
--with=sandbox \
--with= 'resources:cpu=4,memory=8000'
Decorators are applied to every step in the flow. The generated file includes:
STEP_WITH_DECORATORS : Dict[ str , List[ str ]] = {
'start' : [ 'sandbox' , 'resources:cpu=4,memory=8000' ],
'process' : [ 'sandbox' , 'resources:cpu=4,memory=8000' ],
'end' : [ 'sandbox' , 'resources:cpu=4,memory=8000' ],
}
Each step subprocess receives:
python my_flow.py --with=sandbox --with=resources:cpu=4,memory=8000 step start ...
Common use cases
Sandbox execution:
python my_flow.py dagster create my_flow_dagster.py --with=sandbox
Runs every step in an isolated environment (requires metaflow-sandbox extension).
Resource hints:
python my_flow.py dagster create my_flow_dagster.py \
--with= 'resources:cpu=8,memory=16000'
Forwards CPU and memory hints to the compute backend (@kubernetes, @batch, etc.).
Kubernetes execution:
python my_flow.py dagster create my_flow_dagster.py \
--with= 'kubernetes:image=my-image:v1'
Runs all steps on Kubernetes with a specific container image.
Combining configuration options
Set environment variables
Configure the Metaflow backend: export METAFLOW_DEFAULT_METADATA = service
export METAFLOW_DEFAULT_DATASTORE = s3
export METAFLOW_SERVICE_URL = https :// metadata . example . com
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// my-bucket / metaflow
Compile with flags
Add runtime configuration: python my_flow.py dagster create my_flow_dagster.py \
--namespace production \
--workflow-timeout 7200 \
--tag env:prod \
--tag version:3 \
--with=sandbox \
--name prod_pipeline
Launch Dagster
dagster dev -f my_flow_dagster.py
All configuration is embedded in the file.
Environment variable reference
The compiler embeds environment variables matching these prefixes:
METAFLOW_DEFAULT_* - Default backends and settings
METAFLOW_DATASTORE_* - Datastore configuration
METAFLOW_DATATOOLS_* - Data processing tools
METAFLOW_SERVICE_* - Metadata service settings
METAFLOW_METADATA* - Metadata backend settings
METAFLOW_DEBUG_* - Debug flags
All matching variables are captured in METAFLOW_STEP_ENV and forwarded to step subprocesses.
Verifying embedded configuration
To see what configuration was baked in:
Generate the file
python my_flow.py dagster create my_flow_dagster.py
Inspect METAFLOW_TOP_ARGS
Open my_flow_dagster.py and search for: METAFLOW_TOP_ARGS : List[ str ] = [ ... ]
Inspect METAFLOW_STEP_ENV
Search for: METAFLOW_STEP_ENV : Dict[ str , str ] = { ... }
These constants show exactly what configuration will be used at runtime.
Regenerating for new configuration
To change configuration:
Update environment variables or flags
Regenerate the definitions file:
python my_flow.py dagster create my_flow_dagster.py
Restart Dagster:
dagster dev -f my_flow_dagster.py
Existing Dagster runs are not affected by regeneration. Only new runs use the updated configuration.
Next steps
Scheduling Schedule flows to run automatically with @schedule
Step Decorators Learn about @retry, @timeout, @resources, and other step decorators