Skip to main content

Overview

The @environment(vars={...}) decorator allows you to pass environment variables to Metaflow step subprocesses. This is useful for configuring libraries, setting API keys, or controlling runtime behavior without hardcoding values in your flow code.

Basic Usage

from metaflow import FlowSpec, step, environment

class MyFlow(FlowSpec):
    @environment(vars={"TOKENIZERS_PARALLELISM": "false"})
    @step
    def embed(self):
        # The HuggingFace tokenizers library sees TOKENIZERS_PARALLELISM=false
        from transformers import AutoTokenizer
        tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
        self.tokens = tokenizer("Hello, world!")
        self.next(self.end)
    
    @step
    def end(self):
        pass

How It Works

1

Decorator Parsed at Compile Time

When you run dagster create, the compiler extracts the vars dict from the @environment decorator.
2

Variables Embedded in Generated Code

The environment variables are passed to the _run_step helper as extra_env:
task_path = _run_step(
    context, "embed", run_id, upstream, "1",
    extra_env={"TOKENIZERS_PARALLELISM": "false"},
)
3

Merged with Subprocess Environment

The _run_step helper merges the extra environment variables with the subprocess environment before executing metaflow step:
def _run_step(context, step_name, run_id, input_paths, task_id, extra_env=None, ...):
    cmd = [sys.executable, FLOW_FILE, "step", step_name, ...]
    env = {**os.environ, **METAFLOW_STEP_ENV}
    if extra_env:
        env.update(extra_env)
    subprocess.Popen(cmd, env=env, ...)

Multiple Variables

Pass multiple environment variables in a single decorator:
@environment(vars={
    "AWS_REGION": "us-west-2",
    "LOG_LEVEL": "debug",
    "MAX_WORKERS": "4"
})
@step
def process(self):
    import os
    region = os.environ["AWS_REGION"]
    log_level = os.environ["LOG_LEVEL"]
    max_workers = int(os.environ["MAX_WORKERS"])
    # ...

Real-World Examples

Configuring ML Libraries

Many ML libraries use environment variables for configuration:
from metaflow import FlowSpec, step, environment, resources

class TrainingFlow(FlowSpec):
    @environment(vars={
        "TOKENIZERS_PARALLELISM": "false",  # Avoid deadlocks in transformers
        "OMP_NUM_THREADS": "1",             # Control OpenMP threading
        "MKL_NUM_THREADS": "1",             # Control MKL threading
        "CUDA_VISIBLE_DEVICES": "0,1",      # Restrict GPU visibility
    })
    @resources(gpu=2)
    @step
    def train(self):
        import torch
        # Training code uses GPUs 0 and 1 only
        self.model = train_model()
        self.next(self.end)
    
    @step
    def end(self):
        pass

Setting API Keys

Security Warning: Never hardcode secrets in your flow code. Use a secrets manager (AWS Secrets Manager, Vault, etc.) and inject secrets at runtime.
For non-production testing, you can pass API keys via environment variables:
@environment(vars={"OPENAI_API_KEY": "sk-test-key"})
@step
def call_api(self):
    import openai
    # openai library reads OPENAI_API_KEY from environment
    response = openai.Completion.create(...)
    self.result = response.choices[0].text
    self.next(self.end)
For production, fetch secrets dynamically:
@step
def call_api_secure(self):
    import os
    import boto3
    
    # Fetch secret from AWS Secrets Manager
    client = boto3.client('secretsmanager')
    secret = client.get_secret_value(SecretId='openai-key')
    api_key = secret['SecretString']
    
    # Set in environment for subprocess
    os.environ['OPENAI_API_KEY'] = api_key
    # ...

Debugging and Logging

Enable verbose logging for specific steps:
class DebugFlow(FlowSpec):
    @step
    def start(self):
        self.data = load_data()
        self.next(self.process)
    
    @environment(vars={
        "LOG_LEVEL": "DEBUG",
        "PYTHONVERBOSE": "1",
        "TF_CPP_MIN_LOG_LEVEL": "0",  # Show all TensorFlow logs
    })
    @step
    def process(self):
        # This step runs with debug logging enabled
        import logging
        logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
        # ...
        self.next(self.end)
    
    @step
    def end(self):
        # Normal logging level
        pass

Example from Test Suite

Here’s a real example from the metaflow-dagster test suite:
tests/flows/retry_flow.py
from metaflow import FlowSpec, step, retry, timeout, environment


class RetryFlow(FlowSpec):
    @step
    def start(self):
        self.value = 1
        self.next(self.process)

    @retry(times=3, minutes_between_retries=2)
    @timeout(seconds=120)
    @environment(vars={"MY_VAR": "hello", "OTHER": "world"})
    @step
    def process(self):
        self.result = self.value * 2
        self.next(self.end)

    @step
    def end(self):
        pass
The process step receives MY_VAR=hello and OTHER=world in its subprocess environment, alongside retry and timeout configuration.

When to Use @environment

Use @environment(vars={...}) when:
  • Configuring third-party libraries that read environment variables
  • Setting runtime behavior flags (debug mode, feature flags)
  • Passing non-sensitive configuration data
  • Controlling system-level settings (thread counts, GPU visibility)
Don’t use it for:
  • Passing data between steps (use self.* attributes instead)
  • Storing secrets (use a secrets manager)
  • Configuration that changes per run (use Parameter instead)

Environment Variable Scope

Environment variables set via @environment are:
  • Step-scoped: Each step can have different environment variables
  • Subprocess-only: Variables are set for the metaflow step subprocess, not the Dagster op process
  • Additive: They merge with METAFLOW_STEP_ENV (global variables) and os.environ

Global vs. Step-Level Variables

# Set at compile time via step_env parameter
compiler = DagsterCompiler(
    step_env={
        "AWS_REGION": "us-west-2",
        "LOG_LEVEL": "info",
    }
)

Generated Code Structure

Here’s how environment variables flow through the generated Dagster code:
# Extracted at compile time
def _get_env_vars(node) -> dict[str, str]:
    for deco in node.decorators:
        if deco.name == "environment":
            return deco.attributes.get("vars") or {}
    return {}

# Rendered in op body
@op
def op_embed(context: OpExecutionContext, upstream: str) -> str:
    run_id = upstream.split("/")[0]
    task_path = _run_step(
        context, "embed", run_id, upstream, "1",
        extra_env={"TOKENIZERS_PARALLELISM": "false"},  # From @environment
    )
    return task_path

# Merged in subprocess
def _run_step(context, step_name, run_id, input_paths, task_id, extra_env=None, ...):
    cmd = [sys.executable, FLOW_FILE, "step", step_name, ...]
    env = _build_env(extra_env)
    subprocess.Popen(cmd, env=env, ...)

def _build_env(extra: dict | None = None) -> dict:
    e = {**os.environ, **METAFLOW_STEP_ENV}
    if extra:
        e.update(extra)
    return e

Best Practices

Document your environment variables: Add comments explaining what each variable does and why it’s needed.
Use meaningful names: Follow the convention of UPPERCASE_WITH_UNDERSCORES for environment variables.
Changing @environment decorators requires re-running dagster create to regenerate the definitions file. The variables are baked into the generated code at compile time.
Environment variables are passed as strings. If you need to pass complex types, use JSON encoding:
import json

@environment(vars={"CONFIG": json.dumps({"batch_size": 32, "lr": 0.001})})
@step
def train(self):
    import os
    config = json.loads(os.environ["CONFIG"])
    # ...

Comparison with Other Configuration Methods

MethodUse CaseScopeChangeable at Runtime
ParameterUser-provided configEntire flowYes (via Dagster UI)
@environment(vars={...})Library config, flagsPer-stepNo (compile-time)
step_env (compile-time)Global infrastructure settingsAll stepsNo (compile-time)
Metaflow ConfigDatastore, metadata settingsAll stepsNo (compile-time)
Choose the right method based on when the value is known and how often it changes:
  • Known at flow write time + never changes: Hardcode in flow
  • Known at deploy time + same for all runs: Use @environment or step_env
  • Known at runtime + varies per run: Use Parameter
  • Sensitive secrets: Use a secrets manager (not any of these methods directly)