The @environment decorator specifies environment variables to be set before a step executes.
Basic Usage
from metaflow import FlowSpec, step, environment
class MyFlow(FlowSpec):
@environment(vars={'API_KEY': 'secret123', 'DEBUG': 'true'})
@step
def configure(self):
import os
# Environment variables are set before step runs
print(os.environ['API_KEY']) # 'secret123'
print(os.environ['DEBUG']) # 'true'
self.next(self.end)
if __name__ == '__main__':
MyFlow()
Description
The @environment decorator sets environment variables that will be available during the execution of a step. This is useful for:
- Configuring application behavior
- Passing secrets or API keys
- Setting debug flags
- Configuring third-party libraries
- Controlling runtime behavior without changing code
All values are converted to strings before being set as environment variables.
Parameters
vars
Dict[str, str]
default:"{}"
Dictionary of environment variables to set. Keys are variable names, values are variable values. All values will be converted to strings.
Examples
Basic Environment Variables
@environment(vars={
'LOG_LEVEL': 'INFO',
'MAX_WORKERS': '4'
})
@step
def process(self):
import os
log_level = os.environ['LOG_LEVEL']
max_workers = int(os.environ['MAX_WORKERS'])
self.next(self.end)
API Configuration
@environment(vars={
'API_ENDPOINT': 'https://api.example.com',
'API_KEY': 'your-api-key-here',
'API_TIMEOUT': '30'
})
@step
def call_api(self):
import os
import requests
response = requests.get(
os.environ['API_ENDPOINT'],
headers={'Authorization': f"Bearer {os.environ['API_KEY']}"},
timeout=int(os.environ['API_TIMEOUT'])
)
self.data = response.json()
self.next(self.end)
Library Configuration
@environment(vars={
'TF_CPP_MIN_LOG_LEVEL': '2', # Reduce TensorFlow logging
'OMP_NUM_THREADS': '4', # OpenMP threads
'CUDA_VISIBLE_DEVICES': '0,1' # Use GPUs 0 and 1
})
@step
def train_model(self):
import tensorflow as tf
# TensorFlow will use the configured environment
model = train()
self.next(self.end)
Debug Mode
@environment(vars={'DEBUG': 'true', 'VERBOSE': '1'})
@step
def debug_step(self):
import os
if os.environ.get('DEBUG') == 'true':
print("Debug mode enabled")
# Additional debug output
self.next(self.end)
With Cloud Execution
@environment(vars={
'AWS_DEFAULT_REGION': 'us-west-2',
'S3_BUCKET': 'my-data-bucket'
})
@batch(cpu=4, memory=16384)
@step
def cloud_task(self):
import os
import boto3
# Environment variables available in cloud execution
s3 = boto3.client('s3', region_name=os.environ['AWS_DEFAULT_REGION'])
bucket = os.environ['S3_BUCKET']
self.next(self.end)
Different Environments Per Step
@environment(vars={'STAGE': 'development'})
@step
def dev_task(self):
import os
config = load_config(os.environ['STAGE'])
self.next(self.end)
@environment(vars={'STAGE': 'production'})
@step
def prod_task(self):
import os
config = load_config(os.environ['STAGE'])
self.next(self.end)
Variable Types
All environment variable values are converted to strings:
@environment(vars={
'STRING_VAR': 'hello',
'INT_VAR': 42, # Converted to '42'
'FLOAT_VAR': 3.14, # Converted to '3.14'
'BOOL_VAR': True, # Converted to 'True'
'NONE_VAR': None # Converted to 'None'
})
@step
def my_step(self):
import os
# All values are strings
assert isinstance(os.environ['INT_VAR'], str)
value = int(os.environ['INT_VAR']) # Convert back to int
self.next(self.end)
Accessing Environment Variables
Access environment variables using Python’s os.environ:
import os
# Get with default value
value = os.environ.get('MY_VAR', 'default')
# Get (raises KeyError if not set)
value = os.environ['MY_VAR']
# Check if variable exists
if 'MY_VAR' in os.environ:
value = os.environ['MY_VAR']
Combining with Other Decorators
@environment(vars={'CONFIG': 'production'})
@batch(cpu=4, memory=16384)
@retry(times=3)
@step
def production_task(self):
# Environment variables available in cloud execution
import os
config = os.environ['CONFIG']
self.next(self.end)
Best Practices
- Don’t commit secrets: Avoid hardcoding secrets in code. Use parameter passing or secret management instead
- Document expected variables: Comment which environment variables your step expects
- Provide defaults: Use
os.environ.get() with defaults for optional variables
- Type conversion: Remember to convert string values to appropriate types
- Use for configuration: Great for flags and settings that control behavior
Common Patterns
Configuration by Environment
from metaflow import Parameter
class ConfigFlow(FlowSpec):
env = Parameter('env', default='dev')
@step
def start(self):
if self.env == 'dev':
self.next(self.dev_process)
else:
self.next(self.prod_process)
@environment(vars={'ENV': 'dev', 'DEBUG': 'true'})
@step
def dev_process(self):
# Development settings
self.next(self.end)
@environment(vars={'ENV': 'prod', 'DEBUG': 'false'})
@step
def prod_process(self):
# Production settings
self.next(self.end)
Library-Specific Configuration
# Configure multiple libraries
@environment(vars={
# Pandas
'PANDAS_MODE': 'copy_on_write',
# NumPy
'NUMPY_EXPERIMENTAL_ARRAY_FUNCTION': '0',
# Matplotlib
'MPLBACKEND': 'Agg',
# Scikit-learn
'SKLEARN_SEED': '42'
})
@step
def data_science_task(self):
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# Libraries configured via environment
self.next(self.end)
Feature Flags
@environment(vars={
'ENABLE_NEW_FEATURE': 'true',
'USE_CACHE': 'true',
'PARALLEL_PROCESSING': 'false'
})
@step
def configurable_step(self):
import os
if os.environ.get('ENABLE_NEW_FEATURE') == 'true':
result = new_feature_logic()
else:
result = old_logic()
use_cache = os.environ.get('USE_CACHE') == 'true'
self.result = process(result, cache=use_cache)
self.next(self.end)
Security Considerations
Avoid Hardcoding Secrets
❌ Bad: Hardcoding secrets in code
@environment(vars={'API_KEY': 'sk-1234567890abcdef'}) # Don't do this!
@step
def insecure(self):
pass
✅ Good: Use parameters or secret management
from metaflow import Parameter
class SecureFlow(FlowSpec):
api_key = Parameter('api_key', required=True)
@step
def secure(self):
# Pass via parameter, not hardcoded
self.vars = {'API_KEY': self.api_key}
self.next(self.use_api)
@step
def use_api(self):
import os
os.environ['API_KEY'] = self.vars['API_KEY']
# Use the API key
self.next(self.end)
Or use a secrets manager:
@step
def fetch_secret(self):
import boto3
secrets = boto3.client('secretsmanager')
api_key = secrets.get_secret_value(SecretId='my-api-key')
os.environ['API_KEY'] = api_key['SecretString']
self.next(self.end)
Runtime Override
Environment variables can be set at runtime using the --env option:
python flow.py run --env API_KEY=xyz123 --env DEBUG=true
See Also