Skip to main content

Overview

Metaflow provides several debugging tools to help you troubleshoot issues in your flows. This guide covers debug flags, error inspection, and debugging strategies.

Debug Environment Variables

Metaflow supports debug flags that expose internal command lines and operations. Set these environment variables before running your flow:

Available Debug Flags

# See command lines used to launch subcommands (especially 'step')
export METAFLOW_DEBUG_SUBCOMMAND=1
python myflow.py run
The METAFLOW_DEBUG_S3CLIENT flag disables automatic directory cleanup, which can fill up disk space quickly.

Inspecting Failed Tasks

When a task fails, you can inspect it using the Client API:
from metaflow import Flow, Task

# Get the failed run
run = Flow('MyFlow').latest_run

# Find failed tasks
for step in run:
    for task in step:
        if not task.successful:
            print(f"Failed task: {task.pathspec}")
            # Access the exception
            if hasattr(task, 'exception'):
                print(f"Exception: {task.exception}")

Using @catch for Error Handling

The @catch decorator allows flows to continue executing even when a step fails:
from metaflow import FlowSpec, step, catch

class RobustFlow(FlowSpec):
    
    @catch(var='error_info')
    @step
    def process_data(self):
        # This step might fail
        result = risky_operation()
        self.result = result
        self.next(self.end)
    
    @step
    def end(self):
        # Check if previous step failed
        if hasattr(self, 'error_info') and self.error_info is not None:
            print(f"Step failed with: {self.error_info}")
            # Handle the error case
        else:
            print(f"Success: {self.result}")

@catch Parameters

  • var (str): Name of the artifact to store the caught exception
  • print_exception (bool, default True): Whether to print the exception to stdout
The @catch decorator is not supported on foreach split steps or switch steps.

Debugging with Retry

Combine @retry with @catch for robust error handling:
from metaflow import FlowSpec, step, retry, catch

class RetryFlow(FlowSpec):
    
    @retry(times=3, minutes_between_retries=2)
    @catch(var='final_error')
    @step
    def flaky_step(self):
        # Retries 3 times before @catch handles it
        data = fetch_from_api()  # might fail
        self.data = data
        self.next(self.end)
    
    @step
    def end(self):
        if hasattr(self, 'final_error'):
            print("All retries exhausted")
        else:
            print(f"Success after retry: {self.data}")

Interactive Debugging

You can use Python’s debugger in your steps:
import pdb

from metaflow import FlowSpec, step

class DebugFlow(FlowSpec):
    
    @step
    def start(self):
        self.data = [1, 2, 3]
        # Set breakpoint for debugging
        pdb.set_trace()
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Data: {self.data}")
Run the flow with python myflow.py run (not with --with batch or other remote execution) to use interactive debuggers.

Logging for Debugging

Add comprehensive logging to your steps:
import sys
from metaflow import FlowSpec, step

class LoggingFlow(FlowSpec):
    
    @step
    def start(self):
        print(f"Starting with input: {self.config}", file=sys.stderr)
        self.data = process()
        print(f"Processed data: {len(self.data)} items", file=sys.stderr)
        self.next(self.end)
    
    @step
    def end(self):
        print("Flow completed", file=sys.stderr)

Inspecting Artifacts

Access and inspect artifacts from previous runs:
from metaflow import Flow

# Get specific run
run = Flow('MyFlow')['123']

# Access artifacts from any step
for step in run:
    print(f"Step: {step.id}")
    # List all artifacts
    for artifact in step.task:
        print(f"  Artifact: {artifact}")

Common Debugging Patterns

Check Data Shapes

@step
def process(self):
    print(f"Input shape: {self.data.shape}")
    result = transform(self.data)
    print(f"Output shape: {result.shape}")
    self.result = result
    self.next(self.next_step)

Validate Assumptions

@step
def validate(self):
    assert len(self.data) > 0, "Data should not be empty"
    assert all(x > 0 for x in self.data), "All values should be positive"
    self.next(self.process)

Debug Join Steps

@step
def join(self, inputs):
    print(f"Joining {len(inputs)} branches")
    for inp in inputs:
        print(f"  Input: {inp.pathspec}, data: {inp.data}")
    self.next(self.end)

Next Steps

Testing Flows

Learn how to write tests for your flows

Best Practices

Follow recommended patterns for production flows

Build docs developers (and LLMs) love