Skip to main content

Overview

Testing is crucial for maintaining reliable Metaflow flows. This guide covers different testing strategies, from unit tests to integration tests.

Testing Approaches

Metaflow supports multiple testing approaches:
  1. Unit tests - Test individual step functions
  2. Integration tests - Test complete flows end-to-end
  3. Data tests - Test data processing logic
  4. Test harness - Metaflow’s built-in integration test framework

Unit Testing with Pytest

You can test individual functions used in your steps:
# myflow.py
from metaflow import FlowSpec, step

def process_data(data):
    """Process input data."""
    return [x * 2 for x in data]

class MyFlow(FlowSpec):
    @step
    def start(self):
        self.data = process_data([1, 2, 3])
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.data}")
# test_myflow.py
import pytest
from myflow import process_data

def test_process_data():
    result = process_data([1, 2, 3])
    assert result == [2, 4, 6]

def test_process_data_empty():
    result = process_data([])
    assert result == []

def test_process_data_negative():
    result = process_data([-1, -2])
    assert result == [-2, -4]
Run tests with:
pytest test_myflow.py -v

Integration Testing Flows

Test complete flows by executing them and checking results:
# test_integration.py
import subprocess
import json
from metaflow import Flow

def test_flow_execution():
    """Test that the flow executes successfully."""
    result = subprocess.run(
        ['python', 'myflow.py', 'run'],
        capture_output=True,
        text=True
    )
    assert result.returncode == 0, f"Flow failed: {result.stderr}"

def test_flow_results():
    """Test that the flow produces correct results."""
    # Run the flow
    subprocess.run(['python', 'myflow.py', 'run'], check=True)
    
    # Check the results using Client API
    run = Flow('MyFlow').latest_run
    
    # Verify data in the final step
    end_step = run['end'].task
    assert end_step.data == [2, 4, 6]

Testing with Parameters

Test flows with different parameter values:
from metaflow import FlowSpec, step, Parameter

class ParameterizedFlow(FlowSpec):
    
    multiplier = Parameter('multiplier',
                          help='Multiplication factor',
                          default=2)
    
    @step
    def start(self):
        self.result = [x * self.multiplier for x in [1, 2, 3]]
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")
# test_parameters.py
import subprocess
from metaflow import Flow

def test_with_default_parameter():
    subprocess.run(['python', 'paramflow.py', 'run'], check=True)
    run = Flow('ParameterizedFlow').latest_run
    assert run['end'].task.result == [2, 4, 6]

def test_with_custom_parameter():
    subprocess.run(
        ['python', 'paramflow.py', 'run', '--multiplier', '3'],
        check=True
    )
    run = Flow('ParameterizedFlow').latest_run
    assert run['end'].task.result == [3, 6, 9]

Testing Foreach Steps

Test flows with parallel branches:
from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):
    
    @step
    def start(self):
        self.items = [1, 2, 3, 4, 5]
        self.next(self.process, foreach='items')
    
    @step
    def process(self):
        self.result = self.input * 2
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.results = [inp.result for inp in inputs]
        self.next(self.end)
    
    @step
    def end(self):
        print(f"All results: {self.results}")
# test_foreach.py
import subprocess
from metaflow import Flow

def test_foreach_execution():
    subprocess.run(['python', 'foreachflow.py', 'run'], check=True)
    run = Flow('ForeachFlow').latest_run
    
    # Check that all branches executed
    process_step = run['process']
    assert len(list(process_step)) == 5
    
    # Check final results
    end_task = run['end'].task
    assert sorted(end_task.results) == [2, 4, 6, 8, 10]

Testing Error Handling

Test that errors are handled correctly:
from metaflow import FlowSpec, step, catch

class ErrorFlow(FlowSpec):
    
    @catch(var='error')
    @step
    def start(self):
        raise ValueError("Intentional error")
        self.next(self.end)
    
    @step
    def end(self):
        assert hasattr(self, 'error'), "Error should be caught"
        print(f"Error handled: {self.error}")
# test_errors.py
import subprocess
from metaflow import Flow

def test_error_is_caught():
    # Flow should succeed despite the error
    result = subprocess.run(
        ['python', 'errorflow.py', 'run'],
        capture_output=True
    )
    assert result.returncode == 0
    
    # Verify error was caught
    run = Flow('ErrorFlow').latest_run
    end_task = run['end'].task
    assert hasattr(end_task, 'error')
    assert end_task.error is not None

Metaflow Test Harness

Metaflow includes a sophisticated test harness for integration testing. The harness generates test cases by combining:
  • Contexts: Execution environments and configurations
  • Tests: Step function templates with assertions
  • Graphs: Flow graph structures
  • Checkers: Validation methods for different interfaces

Running the Test Harness

cd metaflow/test/core
PYTHONPATH=`pwd`/../../ python run_tests.py --debug --contexts dev-local

Run Specific Tests

PYTHONPATH=`pwd`/../../ python run_tests.py \
  --debug \
  --contexts dev-local \
  --graphs single-linear-step \
  --tests BasicArtifactTest

Data Testing with Pytest

Test data processing logic separately:
# test_data_processing.py
import pytest
import pandas as pd
from myflow import clean_data, validate_data

@pytest.fixture
def sample_data():
    return pd.DataFrame({
        'id': [1, 2, 3],
        'value': [10, 20, 30]
    })

def test_clean_data(sample_data):
    result = clean_data(sample_data)
    assert len(result) == 3
    assert list(result.columns) == ['id', 'value']

def test_validate_data(sample_data):
    # Should not raise an exception
    validate_data(sample_data)

def test_validate_data_invalid():
    invalid_data = pd.DataFrame({'wrong': [1, 2]})
    with pytest.raises(ValueError):
        validate_data(invalid_data)

Mocking External Dependencies

Use mocks to test flows without external dependencies:
# myflow.py
import requests
from metaflow import FlowSpec, step

class APIFlow(FlowSpec):
    
    @step
    def start(self):
        response = requests.get('https://api.example.com/data')
        self.data = response.json()
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Fetched {len(self.data)} items")
# test_api_flow.py
from unittest.mock import patch, Mock
import subprocess

@patch('requests.get')
def test_api_flow(mock_get):
    # Mock the API response
    mock_response = Mock()
    mock_response.json.return_value = [{'id': 1}, {'id': 2}]
    mock_get.return_value = mock_response
    
    # Run the flow
    result = subprocess.run(
        ['python', 'apiflow.py', 'run'],
        capture_output=True
    )
    assert result.returncode == 0

Best Practices for Testing

Extract data processing functions so they can be tested independently:
# Good: Testable function
def calculate_metrics(data):
    return {'mean': sum(data) / len(data)}

class MyFlow(FlowSpec):
    @step
    def start(self):
        self.metrics = calculate_metrics([1, 2, 3])
        self.next(self.end)
Create reusable test data with pytest fixtures:
@pytest.fixture
def sample_dataset():
    return pd.read_csv('test_data.csv')

def test_processing(sample_dataset):
    result = process(sample_dataset)
    assert len(result) > 0
Test boundary conditions and error cases:
def test_empty_input():
    result = process([])
    assert result == []

def test_invalid_input():
    with pytest.raises(ValueError):
        process(None)
Run tests automatically on every commit:
# .github/workflows/test.yml
name: Test Flows
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
      - run: pip install -r requirements.txt
      - run: pytest tests/

Next Steps

Debugging

Learn debugging techniques for flows

Best Practices

Follow recommended patterns for production

Build docs developers (and LLMs) love