Skip to main content
The @environment decorator specifies environment variables to be set before a step executes.

Basic Usage

from metaflow import FlowSpec, step, environment

class MyFlow(FlowSpec):
    @environment(vars={'API_KEY': 'secret123', 'DEBUG': 'true'})
    @step
    def configure(self):
        import os
        # Environment variables are set before step runs
        print(os.environ['API_KEY'])  # 'secret123'
        print(os.environ['DEBUG'])    # 'true'
        self.next(self.end)

if __name__ == '__main__':
    MyFlow()

Description

The @environment decorator sets environment variables that will be available during the execution of a step. This is useful for:
  • Configuring application behavior
  • Passing secrets or API keys
  • Setting debug flags
  • Configuring third-party libraries
  • Controlling runtime behavior without changing code
All values are converted to strings before being set as environment variables.

Parameters

vars
Dict[str, str]
default:"{}"
Dictionary of environment variables to set. Keys are variable names, values are variable values. All values will be converted to strings.

Examples

Basic Environment Variables

@environment(vars={
    'LOG_LEVEL': 'INFO',
    'MAX_WORKERS': '4'
})
@step
def process(self):
    import os
    log_level = os.environ['LOG_LEVEL']
    max_workers = int(os.environ['MAX_WORKERS'])
    self.next(self.end)

API Configuration

@environment(vars={
    'API_ENDPOINT': 'https://api.example.com',
    'API_KEY': 'your-api-key-here',
    'API_TIMEOUT': '30'
})
@step
def call_api(self):
    import os
    import requests
    
    response = requests.get(
        os.environ['API_ENDPOINT'],
        headers={'Authorization': f"Bearer {os.environ['API_KEY']}"},
        timeout=int(os.environ['API_TIMEOUT'])
    )
    self.data = response.json()
    self.next(self.end)

Library Configuration

@environment(vars={
    'TF_CPP_MIN_LOG_LEVEL': '2',  # Reduce TensorFlow logging
    'OMP_NUM_THREADS': '4',        # OpenMP threads
    'CUDA_VISIBLE_DEVICES': '0,1'  # Use GPUs 0 and 1
})
@step
def train_model(self):
    import tensorflow as tf
    # TensorFlow will use the configured environment
    model = train()
    self.next(self.end)

Debug Mode

@environment(vars={'DEBUG': 'true', 'VERBOSE': '1'})
@step
def debug_step(self):
    import os
    
    if os.environ.get('DEBUG') == 'true':
        print("Debug mode enabled")
        # Additional debug output
    
    self.next(self.end)

With Cloud Execution

@environment(vars={
    'AWS_DEFAULT_REGION': 'us-west-2',
    'S3_BUCKET': 'my-data-bucket'
})
@batch(cpu=4, memory=16384)
@step
def cloud_task(self):
    import os
    import boto3
    
    # Environment variables available in cloud execution
    s3 = boto3.client('s3', region_name=os.environ['AWS_DEFAULT_REGION'])
    bucket = os.environ['S3_BUCKET']
    
    self.next(self.end)

Different Environments Per Step

@environment(vars={'STAGE': 'development'})
@step
def dev_task(self):
    import os
    config = load_config(os.environ['STAGE'])
    self.next(self.end)

@environment(vars={'STAGE': 'production'})
@step
def prod_task(self):
    import os
    config = load_config(os.environ['STAGE'])
    self.next(self.end)

Variable Types

All environment variable values are converted to strings:
@environment(vars={
    'STRING_VAR': 'hello',
    'INT_VAR': 42,              # Converted to '42'
    'FLOAT_VAR': 3.14,          # Converted to '3.14'
    'BOOL_VAR': True,           # Converted to 'True'
    'NONE_VAR': None            # Converted to 'None'
})
@step
def my_step(self):
    import os
    # All values are strings
    assert isinstance(os.environ['INT_VAR'], str)
    value = int(os.environ['INT_VAR'])  # Convert back to int
    self.next(self.end)

Accessing Environment Variables

Access environment variables using Python’s os.environ:
import os

# Get with default value
value = os.environ.get('MY_VAR', 'default')

# Get (raises KeyError if not set)
value = os.environ['MY_VAR']

# Check if variable exists
if 'MY_VAR' in os.environ:
    value = os.environ['MY_VAR']

Combining with Other Decorators

@environment(vars={'CONFIG': 'production'})
@batch(cpu=4, memory=16384)
@retry(times=3)
@step
def production_task(self):
    # Environment variables available in cloud execution
    import os
    config = os.environ['CONFIG']
    self.next(self.end)

Best Practices

  1. Don’t commit secrets: Avoid hardcoding secrets in code. Use parameter passing or secret management instead
  2. Document expected variables: Comment which environment variables your step expects
  3. Provide defaults: Use os.environ.get() with defaults for optional variables
  4. Type conversion: Remember to convert string values to appropriate types
  5. Use for configuration: Great for flags and settings that control behavior

Common Patterns

Configuration by Environment

from metaflow import Parameter

class ConfigFlow(FlowSpec):
    env = Parameter('env', default='dev')
    
    @step
    def start(self):
        if self.env == 'dev':
            self.next(self.dev_process)
        else:
            self.next(self.prod_process)
    
    @environment(vars={'ENV': 'dev', 'DEBUG': 'true'})
    @step
    def dev_process(self):
        # Development settings
        self.next(self.end)
    
    @environment(vars={'ENV': 'prod', 'DEBUG': 'false'})
    @step
    def prod_process(self):
        # Production settings
        self.next(self.end)

Library-Specific Configuration

# Configure multiple libraries
@environment(vars={
    # Pandas
    'PANDAS_MODE': 'copy_on_write',
    # NumPy
    'NUMPY_EXPERIMENTAL_ARRAY_FUNCTION': '0',
    # Matplotlib
    'MPLBACKEND': 'Agg',
    # Scikit-learn
    'SKLEARN_SEED': '42'
})
@step
def data_science_task(self):
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    # Libraries configured via environment
    self.next(self.end)

Feature Flags

@environment(vars={
    'ENABLE_NEW_FEATURE': 'true',
    'USE_CACHE': 'true',
    'PARALLEL_PROCESSING': 'false'
})
@step
def configurable_step(self):
    import os
    
    if os.environ.get('ENABLE_NEW_FEATURE') == 'true':
        result = new_feature_logic()
    else:
        result = old_logic()
    
    use_cache = os.environ.get('USE_CACHE') == 'true'
    self.result = process(result, cache=use_cache)
    
    self.next(self.end)

Security Considerations

Avoid Hardcoding Secrets

Bad: Hardcoding secrets in code
@environment(vars={'API_KEY': 'sk-1234567890abcdef'})  # Don't do this!
@step
def insecure(self):
    pass
Good: Use parameters or secret management
from metaflow import Parameter

class SecureFlow(FlowSpec):
    api_key = Parameter('api_key', required=True)
    
    @step
    def secure(self):
        # Pass via parameter, not hardcoded
        self.vars = {'API_KEY': self.api_key}
        self.next(self.use_api)
    
    @step
    def use_api(self):
        import os
        os.environ['API_KEY'] = self.vars['API_KEY']
        # Use the API key
        self.next(self.end)
Or use a secrets manager:
@step
def fetch_secret(self):
    import boto3
    secrets = boto3.client('secretsmanager')
    api_key = secrets.get_secret_value(SecretId='my-api-key')
    os.environ['API_KEY'] = api_key['SecretString']
    self.next(self.end)

Runtime Override

Environment variables can be set at runtime using the --env option:
python flow.py run --env API_KEY=xyz123 --env DEBUG=true

See Also

Build docs developers (and LLMs) love