Skip to main content
The @catch decorator ensures that a step always succeeds, even if an exception is raised. It catches exceptions and optionally stores them in an artifact, allowing the flow to continue.

Basic Usage

from metaflow import FlowSpec, step, catch

class MyFlow(FlowSpec):
    @catch(var='error')
    @step
    def risky_step(self):
        # This step might fail, but flow continues
        result = 1 / 0  # This will raise an exception
        self.result = result
        self.next(self.end)
    
    @step
    def end(self):
        if self.error:
            print(f"Step failed: {self.error}")
        else:
            print(f"Success: {self.result}")

if __name__ == '__main__':
    MyFlow()

Description

The @catch decorator provides graceful error handling by:
  1. Catching any exception raised in the step
  2. Optionally storing the exception in an artifact
  3. Allowing the flow to continue to downstream steps
  4. Enabling conditional logic based on success or failure
This is particularly useful for workflows where some failures are acceptable, or when you want to aggregate results from multiple parallel branches where some might fail.

Parameters

var
str
default:"None"
Name of the artifact in which to store the caught exception. If not specified, the exception is caught but not stored.
print_exception
bool
default:"True"
Whether to print the exception to stdout when caught. Set to False to suppress exception output.

Examples

Basic Exception Handling

@catch(var='error')
@step
def process(self):
    self.result = risky_operation()
    self.next(self.check_result)

@step
def check_result(self):
    if self.error:
        print("Processing failed, using default")
        self.result = default_value()
    print(f"Result: {self.result}")
    self.next(self.end)

Silent Error Handling

@catch(var='error', print_exception=False)
@step
def optional_step(self):
    # Exception won't be printed to logs
    self.optional_data = fetch_optional_data()
    self.next(self.end)

Multiple Parallel Steps with Catch

@step
def start(self):
    self.urls = ['url1', 'url2', 'url3', 'url4']
    self.next(self.fetch, foreach='urls')

@catch(var='fetch_error')
@step
def fetch(self):
    import requests
    # Some URLs might fail
    self.data = requests.get(self.input).json()
    self.next(self.join)

@step
def join(self, inputs):
    # Aggregate results, skipping failed fetches
    self.results = [
        inp.data for inp in inputs 
        if not inp.fetch_error
    ]
    print(f"Successfully fetched {len(self.results)} out of {len(inputs)}")
    self.next(self.end)

With Retry

@catch(var='error')
@retry(times=3)
@step
def resilient_step(self):
    # Try up to 3 times
    # If all retries fail, catch the exception
    self.result = flaky_operation()
    self.next(self.end)

@step
def end(self):
    if self.error:
        print("All retries exhausted, handling gracefully")
    else:
        print(f"Success: {self.result}")

With Timeout

@catch(var='timeout_error')
@timeout(minutes=10)
@step
def timed_operation(self):
    # If operation takes too long, catch the timeout
    self.result = long_running_task()
    self.next(self.end)

@step  
def end(self):
    if self.timeout_error:
        print("Operation timed out")
        self.result = use_cached_result()
    print(f"Result: {self.result}")

Exception Object

The caught exception is stored as a MetaflowExceptionWrapper object with these properties:
@catch(var='error')
@step
def my_step(self):
    raise ValueError("Something went wrong")
    self.next(self.check_error)

@step
def check_error(self):
    if self.error:
        print(f"Exception type: {type(self.error.exception)}")
        print(f"Exception message: {str(self.error.exception)}")
        print(f"Traceback: {self.error.traceback}")
    self.next(self.end)

Behavior

When @catch is present:
  1. On success: The exception variable (if specified) is set to None, and the flow continues normally
  2. On failure: After all retries are exhausted:
    • The exception is caught
    • The exception is stored in the specified variable (if var is provided)
    • The exception is printed (if print_exception=True)
    • self.next() is called automatically with the normal transition
    • Downstream steps can check the exception variable

Best Practices

  1. Use for acceptable failures: Only use @catch when it’s okay for the step to fail
  2. Always check the error: In downstream steps, check if the error variable is set
  3. Combine with foreach: Useful for parallel operations where some failures are expected
  4. Name error variables clearly: Use descriptive names like download_error, api_error, etc.
  5. Consider retry first: Use @retry before @catch to handle transient failures

Limitations

  • Not supported on foreach split steps: Cannot use @catch on steps that create foreach branches
  • Not supported on switch steps: Cannot use @catch on conditional branching steps
  • Not supported with @parallel on Kubernetes: The combination of @catch and @parallel is not supported on Kubernetes

Common Patterns

Optional Data Enrichment

@step
def process(self):
    self.core_data = load_data()
    self.next(self.enrich)

@catch(var='enrich_error', print_exception=False)
@step
def enrich(self):
    # Try to enrich data, but continue if enrichment fails
    self.enriched = call_external_api(self.core_data)
    self.next(self.analyze)

@step
def analyze(self):
    data = self.enriched if not self.enrich_error else self.core_data
    self.analysis = analyze(data)
    self.next(self.end)

Aggregating Partial Results

@step
def start(self):
    self.tasks = range(100)
    self.next(self.process, foreach='tasks')

@catch(var='error')
@retry(times=2)
@step
def process(self):
    self.result = process_task(self.input)
    self.next(self.join)

@step
def join(self, inputs):
    # Separate successful and failed tasks
    successful = [inp for inp in inputs if not inp.error]
    failed = [inp for inp in inputs if inp.error]
    
    self.success_count = len(successful)
    self.failure_count = len(failed)
    self.results = [inp.result for inp in successful]
    
    print(f"Success: {self.success_count}, Failed: {self.failure_count}")
    self.next(self.end)

Graceful Degradation

@catch(var='ml_error')
@step
def ml_prediction(self):
    # Try to use ML model
    self.prediction = ml_model.predict(self.data)
    self.next(self.decide)

@step
def decide(self):
    if self.ml_error:
        # Fall back to rule-based system
        print("ML model failed, using rule-based fallback")
        self.prediction = rule_based_predict(self.data)
    self.next(self.end)

Checking for Exceptions

You can check if an exception was caught:
@catch(var='error')
@step
def risky(self):
    self.result = risky_operation()
    self.next(self.check)

@step
def check(self):
    # Check if exception was caught
    if self.error is None:
        print("Step succeeded")
    else:
        print(f"Step failed: {self.error}")
    self.next(self.end)

See Also

Build docs developers (and LLMs) love