Overview
The @environment(vars={...}) decorator allows you to pass environment variables to Metaflow step subprocesses. This is useful for configuring libraries, setting API keys, or controlling runtime behavior without hardcoding values in your flow code.
Basic Usage
from metaflow import FlowSpec, step, environment
class MyFlow ( FlowSpec ):
@environment ( vars = { "TOKENIZERS_PARALLELISM" : "false" })
@step
def embed ( self ):
# The HuggingFace tokenizers library sees TOKENIZERS_PARALLELISM=false
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained( "bert-base-uncased" )
self .tokens = tokenizer( "Hello, world!" )
self .next( self .end)
@step
def end ( self ):
pass
How It Works
Decorator Parsed at Compile Time
When you run dagster create, the compiler extracts the vars dict from the @environment decorator.
Variables Embedded in Generated Code
The environment variables are passed to the _run_step helper as extra_env: task_path = _run_step(
context, "embed" , run_id, upstream, "1" ,
extra_env = { "TOKENIZERS_PARALLELISM" : "false" },
)
Merged with Subprocess Environment
The _run_step helper merges the extra environment variables with the subprocess environment before executing metaflow step: def _run_step ( context , step_name , run_id , input_paths , task_id , extra_env = None , ...):
cmd = [sys.executable, FLOW_FILE , "step" , step_name, ... ]
env = { ** os.environ, ** METAFLOW_STEP_ENV }
if extra_env:
env.update(extra_env)
subprocess.Popen(cmd, env = env, ... )
Multiple Variables
Pass multiple environment variables in a single decorator:
@environment ( vars = {
"AWS_REGION" : "us-west-2" ,
"LOG_LEVEL" : "debug" ,
"MAX_WORKERS" : "4"
})
@step
def process ( self ):
import os
region = os.environ[ "AWS_REGION" ]
log_level = os.environ[ "LOG_LEVEL" ]
max_workers = int (os.environ[ "MAX_WORKERS" ])
# ...
Real-World Examples
Configuring ML Libraries
Many ML libraries use environment variables for configuration:
from metaflow import FlowSpec, step, environment, resources
class TrainingFlow ( FlowSpec ):
@environment ( vars = {
"TOKENIZERS_PARALLELISM" : "false" , # Avoid deadlocks in transformers
"OMP_NUM_THREADS" : "1" , # Control OpenMP threading
"MKL_NUM_THREADS" : "1" , # Control MKL threading
"CUDA_VISIBLE_DEVICES" : "0,1" , # Restrict GPU visibility
})
@resources ( gpu = 2 )
@step
def train ( self ):
import torch
# Training code uses GPUs 0 and 1 only
self .model = train_model()
self .next( self .end)
@step
def end ( self ):
pass
Setting API Keys
Security Warning : Never hardcode secrets in your flow code. Use a secrets manager (AWS Secrets Manager, Vault, etc.) and inject secrets at runtime.
For non-production testing, you can pass API keys via environment variables:
@environment ( vars = { "OPENAI_API_KEY" : "sk-test-key" })
@step
def call_api ( self ):
import openai
# openai library reads OPENAI_API_KEY from environment
response = openai.Completion.create( ... )
self .result = response.choices[ 0 ].text
self .next( self .end)
For production, fetch secrets dynamically:
@step
def call_api_secure ( self ):
import os
import boto3
# Fetch secret from AWS Secrets Manager
client = boto3.client( 'secretsmanager' )
secret = client.get_secret_value( SecretId = 'openai-key' )
api_key = secret[ 'SecretString' ]
# Set in environment for subprocess
os.environ[ 'OPENAI_API_KEY' ] = api_key
# ...
Debugging and Logging
Enable verbose logging for specific steps:
class DebugFlow ( FlowSpec ):
@step
def start ( self ):
self .data = load_data()
self .next( self .process)
@environment ( vars = {
"LOG_LEVEL" : "DEBUG" ,
"PYTHONVERBOSE" : "1" ,
"TF_CPP_MIN_LOG_LEVEL" : "0" , # Show all TensorFlow logs
})
@step
def process ( self ):
# This step runs with debug logging enabled
import logging
logging.basicConfig( level = os.environ.get( "LOG_LEVEL" , "INFO" ))
# ...
self .next( self .end)
@step
def end ( self ):
# Normal logging level
pass
Example from Test Suite
Here’s a real example from the metaflow-dagster test suite:
tests/flows/retry_flow.py
from metaflow import FlowSpec, step, retry, timeout, environment
class RetryFlow ( FlowSpec ):
@step
def start ( self ):
self .value = 1
self .next( self .process)
@retry ( times = 3 , minutes_between_retries = 2 )
@timeout ( seconds = 120 )
@environment ( vars = { "MY_VAR" : "hello" , "OTHER" : "world" })
@step
def process ( self ):
self .result = self .value * 2
self .next( self .end)
@step
def end ( self ):
pass
The process step receives MY_VAR=hello and OTHER=world in its subprocess environment, alongside retry and timeout configuration.
When to Use @environment
Use @environment(vars={...}) when:
Configuring third-party libraries that read environment variables
Setting runtime behavior flags (debug mode, feature flags)
Passing non-sensitive configuration data
Controlling system-level settings (thread counts, GPU visibility)
Don’t use it for:
Passing data between steps (use self.* attributes instead)
Storing secrets (use a secrets manager)
Configuration that changes per run (use Parameter instead)
Environment Variable Scope
Environment variables set via @environment are:
Step-scoped : Each step can have different environment variables
Subprocess-only : Variables are set for the metaflow step subprocess, not the Dagster op process
Additive : They merge with METAFLOW_STEP_ENV (global variables) and os.environ
Global vs. Step-Level Variables
Global Variables (all steps)
Step-Level Variables
# Set at compile time via step_env parameter
compiler = DagsterCompiler(
step_env = {
"AWS_REGION" : "us-west-2" ,
"LOG_LEVEL" : "info" ,
}
)
Generated Code Structure
Here’s how environment variables flow through the generated Dagster code:
# Extracted at compile time
def _get_env_vars ( node ) -> dict[ str , str ]:
for deco in node.decorators:
if deco.name == "environment" :
return deco.attributes.get( "vars" ) or {}
return {}
# Rendered in op body
@op
def op_embed ( context : OpExecutionContext, upstream : str ) -> str :
run_id = upstream.split( "/" )[ 0 ]
task_path = _run_step(
context, "embed" , run_id, upstream, "1" ,
extra_env = { "TOKENIZERS_PARALLELISM" : "false" }, # From @environment
)
return task_path
# Merged in subprocess
def _run_step ( context , step_name , run_id , input_paths , task_id , extra_env = None , ...):
cmd = [sys.executable, FLOW_FILE , "step" , step_name, ... ]
env = _build_env(extra_env)
subprocess.Popen(cmd, env = env, ... )
def _build_env ( extra : dict | None = None ) -> dict :
e = { ** os.environ, ** METAFLOW_STEP_ENV }
if extra:
e.update(extra)
return e
Best Practices
Document your environment variables : Add comments explaining what each variable does and why it’s needed.
Use meaningful names : Follow the convention of UPPERCASE_WITH_UNDERSCORES for environment variables.
Changing @environment decorators requires re-running dagster create to regenerate the definitions file. The variables are baked into the generated code at compile time.
Environment variables are passed as strings. If you need to pass complex types, use JSON encoding: import json
@environment ( vars = { "CONFIG" : json.dumps({ "batch_size" : 32 , "lr" : 0.001 })})
@step
def train ( self ):
import os
config = json.loads(os.environ[ "CONFIG" ])
# ...
Comparison with Other Configuration Methods
Method Use Case Scope Changeable at Runtime ParameterUser-provided config Entire flow Yes (via Dagster UI) @environment(vars={...})Library config, flags Per-step No (compile-time) step_env (compile-time)Global infrastructure settings All steps No (compile-time) Metaflow Config Datastore, metadata settings All steps No (compile-time)
Choose the right method based on when the value is known and how often it changes:
Known at flow write time + never changes : Hardcode in flow
Known at deploy time + same for all runs : Use @environment or step_env
Known at runtime + varies per run : Use Parameter
Sensitive secrets : Use a secrets manager (not any of these methods directly)