Skip to main content

Overview

This guide covers best practices for building, testing, and deploying production Metaflow workflows. Follow these patterns to create maintainable, scalable, and reliable data pipelines.

Code Organization

Separate Logic from Flow Definition

Extract business logic into separate functions for better testability:
# data_processing.py
def clean_data(raw_data):
    """Clean and validate input data."""
    # Reusable, testable logic
    return cleaned_data

def train_model(data, hyperparams):
    """Train ML model."""
    # Separate from flow orchestration
    return model

# flow.py
from metaflow import FlowSpec, step
from data_processing import clean_data, train_model

class MLFlow(FlowSpec):
    @step
    def start(self):
        # Flow orchestrates, doesn't implement
        self.data = clean_data(load_raw_data())
        self.next(self.train)
    
    @step
    def train(self):
        self.model = train_model(self.data, self.params)
        self.next(self.end)

Use Clear Step Names

Choose descriptive, action-oriented step names:
# Good: Clear intent
@step
def load_training_data(self):
    pass

@step
def preprocess_features(self):
    pass

@step
def train_model(self):
    pass

# Avoid: Vague names
@step
def step1(self):
    pass

@step
def process(self):
    pass

Modularize Complex Flows

Break large flows into smaller, reusable components:
# base_flow.py
from metaflow import FlowSpec

class BaseMLFlow(FlowSpec):
    """Shared ML workflow components."""
    
    def load_data(self):
        """Common data loading logic."""
        pass
    
    def evaluate_model(self, model):
        """Common evaluation logic."""
        pass

# specific_flow.py
from base_flow import BaseMLFlow
from metaflow import step

class ImageClassificationFlow(BaseMLFlow):
    
    @step
    def start(self):
        self.data = self.load_data()
        self.next(self.train)
    
    @step
    def train(self):
        # Image-specific training
        self.model = train_cnn(self.data)
        self.metrics = self.evaluate_model(self.model)
        self.next(self.end)

Error Handling

Use @retry for Transient Failures

Apply retry logic for operations that might fail temporarily:
from metaflow import FlowSpec, step, retry

class RobustFlow(FlowSpec):
    
    @retry(times=3, minutes_between_retries=2)
    @step
    def fetch_data(self):
        # Network requests, API calls
        self.data = fetch_from_api()
        self.next(self.process)
    
    @retry(times=0)  # Disable retry for non-idempotent operations
    @step
    def update_database(self):
        # Database updates should not retry
        write_to_db(self.results)
        self.next(self.end)

Combine @retry with @catch

Gracefully handle failures after retries are exhausted:
from metaflow import FlowSpec, step, retry, catch

class FaultTolerantFlow(FlowSpec):
    
    @retry(times=3)
    @catch(var='api_error')
    @step
    def fetch_external_data(self):
        self.data = call_external_api()
        self.next(self.process)
    
    @step
    def process(self):
        if hasattr(self, 'api_error'):
            # Use fallback data
            self.data = load_cached_data()
        
        self.results = process(self.data)
        self.next(self.end)

Validate Inputs and Outputs

Add validation checks at critical points:
from metaflow import FlowSpec, step

class ValidatedFlow(FlowSpec):
    
    @step
    def start(self):
        data = load_data()
        
        # Validate inputs
        assert len(data) > 0, "Data cannot be empty"
        assert all(x > 0 for x in data), "All values must be positive"
        
        self.data = data
        self.next(self.process)
    
    @step
    def process(self):
        result = transform(self.data)
        
        # Validate outputs
        assert result is not None, "Result cannot be None"
        assert len(result) == len(self.data), "Size mismatch"
        
        self.result = result
        self.next(self.end)

Resource Management

Right-size Compute Resources

Specify appropriate resources for each step:
from metaflow import FlowSpec, step, resources

class OptimizedFlow(FlowSpec):
    
    @resources(memory=2048)  # Light step: 2GB
    @step
    def start(self):
        self.data = load_small_dataset()
        self.next(self.train)
    
    @resources(cpu=8, memory=32768, gpu=1)  # Heavy step: 8 CPU, 32GB, 1 GPU
    @step
    def train(self):
        self.model = train_large_model(self.data)
        self.next(self.end)
    
    @resources(memory=4096)  # Medium step: 4GB
    @step
    def end(self):
        save_model(self.model)

Use Batch for Long-running Jobs

Run expensive operations on batch compute:
from metaflow import FlowSpec, step, batch

class BatchFlow(FlowSpec):
    
    @step
    def start(self):
        # Quick local step
        self.jobs = prepare_job_list()
        self.next(self.process, foreach='jobs')
    
    @batch(cpu=4, memory=16384)
    @step
    def process(self):
        # Heavy processing on batch compute
        self.result = expensive_computation(self.input)
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.all_results = [inp.result for inp in inputs]
        self.next(self.end)

Parallelize with Foreach

Use foreach for parallel processing:
from metaflow import FlowSpec, step

class ParallelFlow(FlowSpec):
    
    @step
    def start(self):
        # Split work into parallel tasks
        self.items = list(range(100))
        self.next(self.process, foreach='items')
    
    @step
    def process(self):
        # Each branch processes independently
        self.result = expensive_operation(self.input)
        self.next(self.join)
    
    @step
    def join(self, inputs):
        # Merge results efficiently
        self.results = [inp.result for inp in inputs]
        self.next(self.end)

Data Management

Store Large Artifacts Efficiently

Use external storage for large data:
from metaflow import FlowSpec, step, S3
import pickle

class DataFlow(FlowSpec):
    
    @step
    def start(self):
        large_data = load_huge_dataset()
        
        # Store large data in S3
        with S3(run=self) as s3:
            s3.put('large_data', pickle.dumps(large_data))
            self.s3_key = s3.key('large_data')
        
        # Only store reference in artifact
        self.data_size = len(large_data)
        self.next(self.process)
    
    @step
    def process(self):
        # Load data when needed
        with S3(run=self) as s3:
            large_data = pickle.loads(s3.get(self.s3_key).blob)
        
        self.result = process(large_data)
        self.next(self.end)

Use IncludeFile for Configuration

Include configuration files in your flow:
from metaflow import FlowSpec, step, IncludeFile
import json

class ConfigFlow(FlowSpec):
    
    config = IncludeFile(
        'config',
        help='Configuration file',
        default='config.json'
    )
    
    @step
    def start(self):
        # Parse configuration
        cfg = json.loads(self.config)
        self.model_params = cfg['model']
        self.data_params = cfg['data']
        self.next(self.train)

Clean Up Temporary Data

Avoid storing unnecessary temporary data:
from metaflow import FlowSpec, step

class CleanFlow(FlowSpec):
    
    @step
    def start(self):
        # Don't store temporary variables
        temp_data = load_data()
        processed = transform(temp_data)  # temp_data not stored
        
        # Only store what's needed
        self.final_data = processed
        self.next(self.end)

Production Deployment

Use Parameters for Configuration

Make flows configurable with parameters:
from metaflow import FlowSpec, step, Parameter

class ProductionFlow(FlowSpec):
    
    environment = Parameter(
        'environment',
        help='Deployment environment',
        default='dev'
    )
    
    model_version = Parameter(
        'model-version',
        help='Model version to deploy',
        required=True
    )
    
    batch_size = Parameter(
        'batch-size',
        help='Processing batch size',
        default=1000,
        type=int
    )
    
    @step
    def start(self):
        print(f"Deploying {self.model_version} to {self.environment}")
        self.config = load_config(self.environment)
        self.next(self.process)

Tag Production Runs

Mark and track production deployments:
from metaflow import FlowSpec, step, current
from datetime import datetime

class DeploymentFlow(FlowSpec):
    
    @step
    def start(self):
        if self.environment == 'production':
            # Tag production runs
            current.run.add_tag('production')
            current.run.add_tag(f"deployed-{datetime.now().date()}")
            current.run.add_tag(f"version:{self.model_version}")
        
        self.next(self.deploy)

Version Everything

Track versions for reproducibility:
from metaflow import FlowSpec, step, Parameter
import subprocess

class VersionedFlow(FlowSpec):
    
    model_version = Parameter('model-version', required=True)
    
    @step
    def start(self):
        # Capture environment information
        self.git_commit = subprocess.check_output(
            ['git', 'rev-parse', 'HEAD']
        ).decode().strip()
        
        self.python_version = sys.version
        self.metaflow_version = get_version()
        
        # Track dependencies
        self.requirements = subprocess.check_output(
            ['pip', 'freeze']
        ).decode()
        
        self.next(self.train)

Implement Monitoring

Add health checks and monitoring:
from metaflow import FlowSpec, step
import time

class MonitoredFlow(FlowSpec):
    
    @step
    def start(self):
        start_time = time.time()
        
        try:
            self.data = load_data()
            self.data_load_time = time.time() - start_time
            
            # Log metrics
            print(f"Data loaded in {self.data_load_time:.2f}s")
            print(f"Dataset size: {len(self.data)}")
            
            # Health check
            assert self.data_load_time < 300, "Data loading took too long"
            
        except Exception as e:
            # Alert on failure
            send_alert(f"Data loading failed: {e}")
            raise
        
        self.next(self.process)

Testing

Write Unit Tests

Test business logic separately:
# test_processing.py
import pytest
from data_processing import clean_data, validate_schema

def test_clean_data():
    raw = [1, 2, None, 3, -1]
    cleaned = clean_data(raw)
    assert None not in cleaned
    assert all(x >= 0 for x in cleaned)

def test_validate_schema():
    valid_data = {'id': 1, 'value': 100}
    assert validate_schema(valid_data) == True
    
    invalid_data = {'id': 1}  # missing 'value'
    with pytest.raises(ValueError):
        validate_schema(invalid_data)

Test Flows End-to-End

Run integration tests:
# test_flow.py
import subprocess
from metaflow import Flow

def test_flow_execution():
    # Run flow
    result = subprocess.run(
        ['python', 'myflow.py', 'run'],
        capture_output=True
    )
    assert result.returncode == 0
    
    # Verify results
    run = Flow('MyFlow').latest_run
    assert run.successful
    assert run['end'].task.data is not None

Use CI/CD

Automate testing and deployment:
# .github/workflows/test.yml
name: Test Flows

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
        with:
          python-version: '3.9'
      - run: pip install -r requirements.txt
      - run: pytest tests/
      - run: python myflow.py run --environment test

Documentation

Document Your Flows

Add clear docstrings:
from metaflow import FlowSpec, step, Parameter

class WellDocumentedFlow(FlowSpec):
    """
    Production ML pipeline for customer churn prediction.
    
    This flow:
    1. Loads customer data from the data warehouse
    2. Preprocesses features and handles missing values
    3. Trains a gradient boosting model
    4. Evaluates on test set and generates reports
    5. Deploys model if accuracy > 0.85
    
    Example:
        python flow.py run --model-version v2.1.0
    """
    
    model_version = Parameter(
        'model-version',
        help='Semantic version for this model (e.g., v2.1.0)',
        required=True
    )
    
    @step
    def start(self):
        """
        Load raw customer data from Snowflake.
        
        Queries the customers table and filters for active accounts.
        Stores data in self.raw_data.
        """
        self.raw_data = load_customer_data()
        self.next(self.preprocess)

Add Helpful Messages

Provide user feedback:
@step
def process(self):
    print(f"Processing {len(self.data)} items...")
    
    for i, item in enumerate(self.data):
        if i % 100 == 0:
            print(f"Progress: {i}/{len(self.data)}")
        process_item(item)
    
    print("Processing complete!")
    self.next(self.end)

Common Pitfalls to Avoid

# Bad: Mutable default
@step
def process(self, items=[]):
    items.append(1)  # Shared across calls!
    self.items = items

# Good: Use None and create new
@step
def process(self, items=None):
    if items is None:
        items = []
    items.append(1)
    self.items = items
# Bad: Can't serialize
@step
def start(self):
    self.db_connection = create_connection()  # Won't serialize!

# Good: Store connection info
@step
def start(self):
    self.db_config = {'host': 'localhost', 'port': 5432}
    # Create connection when needed
    conn = create_connection(self.db_config)
# Bad: Missing next()
@step
def start(self):
    self.data = load_data()
    # Forgot self.next()!

# Good: Always call next()
@step
def start(self):
    self.data = load_data()
    self.next(self.process)  # Required!
# Bad: Global variable
GLOBAL_CACHE = {}

@step
def start(self):
    GLOBAL_CACHE['key'] = 'value'  # Won't work across tasks!

# Good: Use self attributes
@step
def start(self):
    self.cache = {'key': 'value'}  # Properly serialized

Performance Tips

Minimize Data Movement

# Bad: Unnecessary data copying
@step
def process(self):
    data_copy = self.large_data.copy()  # Expensive!
    process(data_copy)

# Good: Process in place or use references
@step
def process(self):
    process(self.large_data)  # No copy needed

Batch Operations

# Bad: One at a time
@step
def process(self):
    for item in self.items:
        result = api_call(item)  # Slow!

# Good: Batch processing
@step
def process(self):
    results = batch_api_call(self.items)  # Much faster!

Use Appropriate Data Structures

# Bad: List for lookups
self.items = [1, 2, 3, 4, 5]  # O(n) lookup
if x in self.items:  # Slow for large lists
    pass

# Good: Set for lookups
self.items = {1, 2, 3, 4, 5}  # O(1) lookup
if x in self.items:  # Fast!
    pass

Next Steps

Debugging

Debug flows effectively

Testing

Write comprehensive tests

Versioning

Track and manage versions

Tagging

Organize runs with tags

Build docs developers (and LLMs) love