Skip to main content

Overview

The @catch decorator allows steps to continue executing even if they fail. This is essential for building resilient workflows that can handle partial failures gracefully.

Basic Usage

from metaflow import FlowSpec, step, catch

class ResilientFlow(FlowSpec):
    @catch()
    @step
    def start(self):
        # This step will not fail the entire flow
        risky_operation()
        self.next(self.end)
    
    @step
    def end(self):
        print("Flow completed despite errors")
Without @catch, if risky_operation() fails, the entire flow fails. With @catch, the flow continues to the next step.

Capturing Exceptions

Store the exception in an artifact:
class ErrorCaptureFlow(FlowSpec):
    @catch(var='error')
    @step
    def process(self):
        # This might fail
        self.result = risky_calculation()
        self.next(self.end)
    
    @step
    def end(self):
        # Check if an error occurred
        if self.error is not None:
            print(f"Step failed with: {self.error}")
            print(f"Exception type: {type(self.error).__name__}")
            # Handle the failure case
            self.result = None
        else:
            print(f"Success! Result: {self.result}")
From the implementation:
# From catch_decorator.py:33
var : str, optional, default None
    Name of the artifact in which to store the caught exception.
    If not specified, the exception is not stored.

How @catch Works

The decorator intercepts failures after all retries are exhausted:
# From catch_decorator.py:76
def task_exception(
    self, exception, step, flow, graph, retry_count, max_user_code_retries
):
    # Only "catch" exceptions after all retries are exhausted
    if retry_count < max_user_code_retries:
        return False
    
    if self.attributes['print_exception']:
        self._print_exception(step, flow)
    
    # pretend that self.next() was called as usual
    flow._transition = (graph[step].out_funcs, None)
    
    # store the exception
    picklable = MetaflowExceptionWrapper(exception)
    flow._catch_exception = picklable
    self._set_var(flow, picklable)
    return True
Key behaviors:
  1. Retries happen first (if configured)
  2. After all retries fail, @catch activates
  3. The exception is stored (if var is specified)
  4. Flow continues to the next step

Catch Parameters

var: Exception Variable

@catch(var='error')
@step
def process(self):
    raise ValueError("Something went wrong")
    self.next(self.end)

@step
def end(self):
    # error is None if step succeeded
    # error contains the exception if step failed
    if self.error:
        print(f"Caught: {self.error}")
# Print exception (default)
@catch(var='error', print_exception=True)
@step
def process(self):
    raise ValueError("Error")
    self.next(self.end)

# Silent failure
@catch(var='error', print_exception=False)
@step
def process(self):
    raise ValueError("Error")
    self.next(self.end)
From the source:
# From catch_decorator.py:36
print_exception : bool, default True
    Determines whether or not the exception is printed to
    stdout when caught.

Limitations

No Catch on Foreach Splits

# From catch_decorator.py:48
if graph[step].type == 'foreach':
    raise MetaflowException(
        "@catch is defined for the step *%s* "
        "but @catch is not supported in foreach "
        "split steps." % step
    )
You cannot use @catch on the step that creates the foreach:
# This will fail
@catch()
@step
def start(self):
    self.items = [1, 2, 3]
    self.next(self.process, foreach='items')
Instead, apply @catch to the foreach step itself:
@step
def start(self):
    self.items = [1, 2, 3]
    self.next(self.process, foreach='items')

@catch(var='error')
@step
def process(self):
    # Each task can fail independently
    risky_operation(self.input)
    self.next(self.join)

No Catch on Switch Steps

# From catch_decorator.py:57
if graph[step].type == 'split-switch':
    raise MetaflowException(
        "@catch is defined for the step *%s* "
        "but @catch is not supported in conditional "
        "switch steps." % step
    )
Switch steps cannot use @catch:
# This will fail
@catch()
@step
def start(self):
    self.condition = 'a'
    self.next({'a': self.step_a, 'b': self.step_b}, condition='condition')

Error Handling in Foreach

Individual Task Failures

Each foreach task can fail independently:
class ForeachCatchFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3, 4, 5]
        self.next(self.process, foreach='items')
    
    @catch(var='error')
    @step
    def process(self):
        if self.input == 3:
            raise ValueError("Can't process 3!")
        self.result = self.input * 2
        self.next(self.join)
    
    @step
    def join(self, inputs):
        # Filter successful and failed tasks
        successful = [inp for inp in inputs if inp.error is None]
        failed = [inp for inp in inputs if inp.error is not None]
        
        print(f"Successful: {len(successful)}")
        print(f"Failed: {len(failed)}")
        
        # Process only successful results
        self.results = [inp.result for inp in successful]
        self.failed_inputs = [inp.input for inp in failed]
        
        self.next(self.end)

Handling Multiple Failures

@step
def join(self, inputs):
    # Collect all errors
    errors = {}
    for inp in inputs:
        if inp.error is not None:
            errors[inp.input] = inp.error
    
    if errors:
        print(f"Failed items: {list(errors.keys())}")
        for item, error in errors.items():
            print(f"  {item}: {error}")
    
    # Process successful items
    self.results = [
        inp.result 
        for inp in inputs 
        if inp.error is None
    ]
    
    self.next(self.end)

Error Handling in Branches

Parallel Branch Failures

class BranchCatchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    
    @catch(var='error_a')
    @step
    def branch_a(self):
        # Might fail
        self.result_a = unstable_operation()
        self.next(self.join)
    
    @catch(var='error_b')
    @step
    def branch_b(self):
        # Might also fail
        self.result_b = another_operation()
        self.next(self.join)
    
    @step
    def join(self, inputs):
        # Check which branches failed
        a_failed = inputs.branch_a.error_a is not None
        b_failed = inputs.branch_b.error_b is not None
        
        if a_failed and b_failed:
            print("Both branches failed!")
            self.result = None
        elif a_failed:
            print("Branch A failed, using B")
            self.result = inputs.branch_b.result_b
        elif b_failed:
            print("Branch B failed, using A")
            self.result = inputs.branch_a.result_a
        else:
            print("Both succeeded!")
            self.result = [
                inputs.branch_a.result_a,
                inputs.branch_b.result_b
            ]
        
        self.next(self.end)

Retry Behavior

@catch works with @retry:
from metaflow import retry

@retry(times=3)
@catch(var='error')
@step
def process(self):
    # Will retry 3 times
    # If all retries fail, @catch activates
    unstable_operation()
    self.next(self.end)
Execution order:
  1. First attempt fails → retry 1
  2. Retry 1 fails → retry 2
  3. Retry 2 fails → retry 3
  4. Retry 3 fails → @catch catches the exception
From the source:
# From catch_decorator.py:105
def step_task_retry_count(self):
    return 0, NUM_FALLBACK_RETRIES

Exception Information

The stored exception is wrapped:
# From catch_decorator.py:94
picklable = MetaflowExceptionWrapper(exception)
Access exception details:
@step
def end(self):
    if self.error is not None:
        # Exception message
        print(f"Error: {str(self.error)}")
        
        # Exception type
        print(f"Type: {type(self.error).__name__}")
        
        # The exception is wrapped but behaves like the original
        if isinstance(self.error, ValueError):
            print("Was a ValueError")

Best Practices

1. Use Specific Error Variables

# Good: Specific names
@catch(var='validation_error')
@step
def validate(self):
    check_data()
    self.next(self.process)

@catch(var='processing_error')
@step
def process(self):
    transform_data()
    self.next(self.end)

# Avoid: Generic names
@catch(var='error')
@step
def validate(self):
    ...

@catch(var='error')  # Name conflict!
@step
def process(self):
    ...

2. Always Check Error State

@catch(var='error')
@step
def process(self):
    self.result = compute()
    self.next(self.end)

@step
def end(self):
    # Always check before using artifacts from @catch steps
    if self.error is None:
        print(f"Result: {self.result}")
    else:
        print("Processing failed, using default")
        self.result = get_default_result()

3. Provide Fallback Logic

@catch(var='api_error')
@step
def fetch_from_api(self):
    self.data = call_external_api()
    self.next(self.process)

@step
def process(self):
    if self.api_error is not None:
        print("API failed, using cached data")
        self.data = load_cached_data()
    
    self.result = process_data(self.data)
    self.next(self.end)

4. Log Failures for Monitoring

@catch(var='error', print_exception=True)
@step
def process(self):
    self.result = risky_operation()
    self.next(self.end)

@step
def end(self):
    if self.error is not None:
        # Log to monitoring system
        log_to_monitoring({
            'flow': self.name,
            'error': str(self.error),
            'timestamp': datetime.now()
        })

5. Graceful Degradation

class RobustPipeline(FlowSpec):
    @step
    def start(self):
        self.next(self.enrich, self.validate)
    
    @catch(var='enrich_error')
    @step
    def enrich(self):
        # Nice to have, but not critical
        self.enriched_data = enrich_data(self.data)
        self.next(self.join)
    
    @step
    def validate(self):
        # Critical step, no @catch
        self.validation_passed = validate_data(self.data)
        if not self.validation_passed:
            raise ValueError("Validation failed!")
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.merge_artifacts(inputs)
        
        # Use enriched data if available, original otherwise
        if self.enrich_error is None:
            self.final_data = self.enriched_data
        else:
            print("Using non-enriched data")
            self.final_data = self.data
        
        self.next(self.end)

Common Patterns

Retry Pattern

from metaflow import retry

@retry(times=3, minutes_between_retries=5)
@catch(var='error')
@step
def call_api(self):
    """Try 3 times with 5 minute delays, then catch if all fail."""
    self.data = fetch_from_api()
    self.next(self.end)

Optional Step Pattern

@catch(var='optional_error', print_exception=False)
@step
def optional_enrichment(self):
    """This step is optional - silently fail if it doesn't work."""
    self.enriched = enrich_data(self.data)
    self.next(self.end)

@step
def end(self):
    data = self.enriched if self.optional_error is None else self.data
    print(f"Using {'enriched' if self.optional_error is None else 'original'} data")

Best Effort Pattern

@step
def start(self):
    self.models = ['model_a', 'model_b', 'model_c']
    self.next(self.train, foreach='models')

@catch(var='training_error')
@step
def train(self):
    """Train models, some may fail."""
    self.model = train_model(self.input)
    self.score = evaluate(self.model)
    self.next(self.select_best)

@step
def select_best(self, inputs):
    """Use the best model that succeeded."""
    successful = [inp for inp in inputs if inp.training_error is None]
    
    if not successful:
        raise ValueError("All models failed to train!")
    
    best = max(successful, key=lambda x: x.score)
    self.best_model = best.model
    self.best_score = best.score
    
    print(f"Used {len(successful)}/{len(inputs)} models")
    self.next(self.end)

Circuit Breaker Pattern

@step
def start(self):
    self.items = range(100)
    self.next(self.process, foreach='items')

@catch(var='error')
@step
def process(self):
    self.result = process_item(self.input)
    self.next(self.join)

@step
def join(self, inputs):
    failed_count = sum(1 for inp in inputs if inp.error is not None)
    failure_rate = failed_count / len(inputs)
    
    if failure_rate > 0.5:
        raise ValueError(
            f"Too many failures: {failure_rate:.1%}. "
            "Circuit breaker activated."
            )
    
    # Continue with successful results
    self.results = [inp.result for inp in inputs if inp.error is None]
    self.next(self.end)

Debugging Failed Steps

When a step fails but is caught:
@step
def end(self):
    if self.error is not None:
        # Print full error details
        import traceback
        print("Error details:")
        print(f"Type: {type(self.error).__name__}")
        print(f"Message: {str(self.error)}")
        
        # The traceback is included in print_exception
        # You can also check run logs for the full trace
From the decorator:
# From catch_decorator.py:64
def _print_exception(self, step, flow):
    self.logger(head="@catch caught an exception from %s" % flow, timestamp=False)
    for line in traceback.format_exc().splitlines():
        self.logger(">  %s" % line, timestamp=False)

Next Steps

FlowSpec

Learn more about the FlowSpec base class

Foreach

Handle errors in parallel foreach tasks

Branching

Manage failures across parallel branches

Data Management

Understand artifact behavior with errors

Build docs developers (and LLMs) love