Overview
This guide covers best practices for building, testing, and deploying production Metaflow workflows. Follow these patterns to create maintainable, scalable, and reliable data pipelines.Code Organization
Separate Logic from Flow Definition
Extract business logic into separate functions for better testability:# data_processing.py
def clean_data(raw_data):
"""Clean and validate input data."""
# Reusable, testable logic
return cleaned_data
def train_model(data, hyperparams):
"""Train ML model."""
# Separate from flow orchestration
return model
# flow.py
from metaflow import FlowSpec, step
from data_processing import clean_data, train_model
class MLFlow(FlowSpec):
@step
def start(self):
# Flow orchestrates, doesn't implement
self.data = clean_data(load_raw_data())
self.next(self.train)
@step
def train(self):
self.model = train_model(self.data, self.params)
self.next(self.end)
Use Clear Step Names
Choose descriptive, action-oriented step names:# Good: Clear intent
@step
def load_training_data(self):
pass
@step
def preprocess_features(self):
pass
@step
def train_model(self):
pass
# Avoid: Vague names
@step
def step1(self):
pass
@step
def process(self):
pass
Modularize Complex Flows
Break large flows into smaller, reusable components:# base_flow.py
from metaflow import FlowSpec
class BaseMLFlow(FlowSpec):
"""Shared ML workflow components."""
def load_data(self):
"""Common data loading logic."""
pass
def evaluate_model(self, model):
"""Common evaluation logic."""
pass
# specific_flow.py
from base_flow import BaseMLFlow
from metaflow import step
class ImageClassificationFlow(BaseMLFlow):
@step
def start(self):
self.data = self.load_data()
self.next(self.train)
@step
def train(self):
# Image-specific training
self.model = train_cnn(self.data)
self.metrics = self.evaluate_model(self.model)
self.next(self.end)
Error Handling
Use @retry for Transient Failures
Apply retry logic for operations that might fail temporarily:from metaflow import FlowSpec, step, retry
class RobustFlow(FlowSpec):
@retry(times=3, minutes_between_retries=2)
@step
def fetch_data(self):
# Network requests, API calls
self.data = fetch_from_api()
self.next(self.process)
@retry(times=0) # Disable retry for non-idempotent operations
@step
def update_database(self):
# Database updates should not retry
write_to_db(self.results)
self.next(self.end)
Combine @retry with @catch
Gracefully handle failures after retries are exhausted:from metaflow import FlowSpec, step, retry, catch
class FaultTolerantFlow(FlowSpec):
@retry(times=3)
@catch(var='api_error')
@step
def fetch_external_data(self):
self.data = call_external_api()
self.next(self.process)
@step
def process(self):
if hasattr(self, 'api_error'):
# Use fallback data
self.data = load_cached_data()
self.results = process(self.data)
self.next(self.end)
Validate Inputs and Outputs
Add validation checks at critical points:from metaflow import FlowSpec, step
class ValidatedFlow(FlowSpec):
@step
def start(self):
data = load_data()
# Validate inputs
assert len(data) > 0, "Data cannot be empty"
assert all(x > 0 for x in data), "All values must be positive"
self.data = data
self.next(self.process)
@step
def process(self):
result = transform(self.data)
# Validate outputs
assert result is not None, "Result cannot be None"
assert len(result) == len(self.data), "Size mismatch"
self.result = result
self.next(self.end)
Resource Management
Right-size Compute Resources
Specify appropriate resources for each step:from metaflow import FlowSpec, step, resources
class OptimizedFlow(FlowSpec):
@resources(memory=2048) # Light step: 2GB
@step
def start(self):
self.data = load_small_dataset()
self.next(self.train)
@resources(cpu=8, memory=32768, gpu=1) # Heavy step: 8 CPU, 32GB, 1 GPU
@step
def train(self):
self.model = train_large_model(self.data)
self.next(self.end)
@resources(memory=4096) # Medium step: 4GB
@step
def end(self):
save_model(self.model)
Use Batch for Long-running Jobs
Run expensive operations on batch compute:from metaflow import FlowSpec, step, batch
class BatchFlow(FlowSpec):
@step
def start(self):
# Quick local step
self.jobs = prepare_job_list()
self.next(self.process, foreach='jobs')
@batch(cpu=4, memory=16384)
@step
def process(self):
# Heavy processing on batch compute
self.result = expensive_computation(self.input)
self.next(self.join)
@step
def join(self, inputs):
self.all_results = [inp.result for inp in inputs]
self.next(self.end)
Parallelize with Foreach
Use foreach for parallel processing:from metaflow import FlowSpec, step
class ParallelFlow(FlowSpec):
@step
def start(self):
# Split work into parallel tasks
self.items = list(range(100))
self.next(self.process, foreach='items')
@step
def process(self):
# Each branch processes independently
self.result = expensive_operation(self.input)
self.next(self.join)
@step
def join(self, inputs):
# Merge results efficiently
self.results = [inp.result for inp in inputs]
self.next(self.end)
Data Management
Store Large Artifacts Efficiently
Use external storage for large data:from metaflow import FlowSpec, step, S3
import pickle
class DataFlow(FlowSpec):
@step
def start(self):
large_data = load_huge_dataset()
# Store large data in S3
with S3(run=self) as s3:
s3.put('large_data', pickle.dumps(large_data))
self.s3_key = s3.key('large_data')
# Only store reference in artifact
self.data_size = len(large_data)
self.next(self.process)
@step
def process(self):
# Load data when needed
with S3(run=self) as s3:
large_data = pickle.loads(s3.get(self.s3_key).blob)
self.result = process(large_data)
self.next(self.end)
Use IncludeFile for Configuration
Include configuration files in your flow:from metaflow import FlowSpec, step, IncludeFile
import json
class ConfigFlow(FlowSpec):
config = IncludeFile(
'config',
help='Configuration file',
default='config.json'
)
@step
def start(self):
# Parse configuration
cfg = json.loads(self.config)
self.model_params = cfg['model']
self.data_params = cfg['data']
self.next(self.train)
Clean Up Temporary Data
Avoid storing unnecessary temporary data:from metaflow import FlowSpec, step
class CleanFlow(FlowSpec):
@step
def start(self):
# Don't store temporary variables
temp_data = load_data()
processed = transform(temp_data) # temp_data not stored
# Only store what's needed
self.final_data = processed
self.next(self.end)
Production Deployment
Use Parameters for Configuration
Make flows configurable with parameters:from metaflow import FlowSpec, step, Parameter
class ProductionFlow(FlowSpec):
environment = Parameter(
'environment',
help='Deployment environment',
default='dev'
)
model_version = Parameter(
'model-version',
help='Model version to deploy',
required=True
)
batch_size = Parameter(
'batch-size',
help='Processing batch size',
default=1000,
type=int
)
@step
def start(self):
print(f"Deploying {self.model_version} to {self.environment}")
self.config = load_config(self.environment)
self.next(self.process)
Tag Production Runs
Mark and track production deployments:from metaflow import FlowSpec, step, current
from datetime import datetime
class DeploymentFlow(FlowSpec):
@step
def start(self):
if self.environment == 'production':
# Tag production runs
current.run.add_tag('production')
current.run.add_tag(f"deployed-{datetime.now().date()}")
current.run.add_tag(f"version:{self.model_version}")
self.next(self.deploy)
Version Everything
Track versions for reproducibility:from metaflow import FlowSpec, step, Parameter
import subprocess
class VersionedFlow(FlowSpec):
model_version = Parameter('model-version', required=True)
@step
def start(self):
# Capture environment information
self.git_commit = subprocess.check_output(
['git', 'rev-parse', 'HEAD']
).decode().strip()
self.python_version = sys.version
self.metaflow_version = get_version()
# Track dependencies
self.requirements = subprocess.check_output(
['pip', 'freeze']
).decode()
self.next(self.train)
Implement Monitoring
Add health checks and monitoring:from metaflow import FlowSpec, step
import time
class MonitoredFlow(FlowSpec):
@step
def start(self):
start_time = time.time()
try:
self.data = load_data()
self.data_load_time = time.time() - start_time
# Log metrics
print(f"Data loaded in {self.data_load_time:.2f}s")
print(f"Dataset size: {len(self.data)}")
# Health check
assert self.data_load_time < 300, "Data loading took too long"
except Exception as e:
# Alert on failure
send_alert(f"Data loading failed: {e}")
raise
self.next(self.process)
Testing
Write Unit Tests
Test business logic separately:# test_processing.py
import pytest
from data_processing import clean_data, validate_schema
def test_clean_data():
raw = [1, 2, None, 3, -1]
cleaned = clean_data(raw)
assert None not in cleaned
assert all(x >= 0 for x in cleaned)
def test_validate_schema():
valid_data = {'id': 1, 'value': 100}
assert validate_schema(valid_data) == True
invalid_data = {'id': 1} # missing 'value'
with pytest.raises(ValueError):
validate_schema(invalid_data)
Test Flows End-to-End
Run integration tests:# test_flow.py
import subprocess
from metaflow import Flow
def test_flow_execution():
# Run flow
result = subprocess.run(
['python', 'myflow.py', 'run'],
capture_output=True
)
assert result.returncode == 0
# Verify results
run = Flow('MyFlow').latest_run
assert run.successful
assert run['end'].task.data is not None
Use CI/CD
Automate testing and deployment:# .github/workflows/test.yml
name: Test Flows
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: '3.9'
- run: pip install -r requirements.txt
- run: pytest tests/
- run: python myflow.py run --environment test
Documentation
Document Your Flows
Add clear docstrings:from metaflow import FlowSpec, step, Parameter
class WellDocumentedFlow(FlowSpec):
"""
Production ML pipeline for customer churn prediction.
This flow:
1. Loads customer data from the data warehouse
2. Preprocesses features and handles missing values
3. Trains a gradient boosting model
4. Evaluates on test set and generates reports
5. Deploys model if accuracy > 0.85
Example:
python flow.py run --model-version v2.1.0
"""
model_version = Parameter(
'model-version',
help='Semantic version for this model (e.g., v2.1.0)',
required=True
)
@step
def start(self):
"""
Load raw customer data from Snowflake.
Queries the customers table and filters for active accounts.
Stores data in self.raw_data.
"""
self.raw_data = load_customer_data()
self.next(self.preprocess)
Add Helpful Messages
Provide user feedback:@step
def process(self):
print(f"Processing {len(self.data)} items...")
for i, item in enumerate(self.data):
if i % 100 == 0:
print(f"Progress: {i}/{len(self.data)}")
process_item(item)
print("Processing complete!")
self.next(self.end)
Common Pitfalls to Avoid
Don't use mutable default arguments
Don't use mutable default arguments
# Bad: Mutable default
@step
def process(self, items=[]):
items.append(1) # Shared across calls!
self.items = items
# Good: Use None and create new
@step
def process(self, items=None):
if items is None:
items = []
items.append(1)
self.items = items
Don't store non-serializable objects
Don't store non-serializable objects
# Bad: Can't serialize
@step
def start(self):
self.db_connection = create_connection() # Won't serialize!
# Good: Store connection info
@step
def start(self):
self.db_config = {'host': 'localhost', 'port': 5432}
# Create connection when needed
conn = create_connection(self.db_config)
Don't forget to call self.next()
Don't forget to call self.next()
# Bad: Missing next()
@step
def start(self):
self.data = load_data()
# Forgot self.next()!
# Good: Always call next()
@step
def start(self):
self.data = load_data()
self.next(self.process) # Required!
Don't use global state
Don't use global state
# Bad: Global variable
GLOBAL_CACHE = {}
@step
def start(self):
GLOBAL_CACHE['key'] = 'value' # Won't work across tasks!
# Good: Use self attributes
@step
def start(self):
self.cache = {'key': 'value'} # Properly serialized
Performance Tips
Minimize Data Movement
# Bad: Unnecessary data copying
@step
def process(self):
data_copy = self.large_data.copy() # Expensive!
process(data_copy)
# Good: Process in place or use references
@step
def process(self):
process(self.large_data) # No copy needed
Batch Operations
# Bad: One at a time
@step
def process(self):
for item in self.items:
result = api_call(item) # Slow!
# Good: Batch processing
@step
def process(self):
results = batch_api_call(self.items) # Much faster!
Use Appropriate Data Structures
# Bad: List for lookups
self.items = [1, 2, 3, 4, 5] # O(n) lookup
if x in self.items: # Slow for large lists
pass
# Good: Set for lookups
self.items = {1, 2, 3, 4, 5} # O(1) lookup
if x in self.items: # Fast!
pass
Next Steps
Debugging
Debug flows effectively
Testing
Write comprehensive tests
Versioning
Track and manage versions
Tagging
Organize runs with tags
