The @catch decorator ensures that a step always succeeds, even if an exception is raised. It catches exceptions and optionally stores them in an artifact, allowing the flow to continue.
Basic Usage
from metaflow import FlowSpec, step, catch
class MyFlow(FlowSpec):
@catch(var='error')
@step
def risky_step(self):
# This step might fail, but flow continues
result = 1 / 0 # This will raise an exception
self.result = result
self.next(self.end)
@step
def end(self):
if self.error:
print(f"Step failed: {self.error}")
else:
print(f"Success: {self.result}")
if __name__ == '__main__':
MyFlow()
Description
The @catch decorator provides graceful error handling by:
- Catching any exception raised in the step
- Optionally storing the exception in an artifact
- Allowing the flow to continue to downstream steps
- Enabling conditional logic based on success or failure
This is particularly useful for workflows where some failures are acceptable, or when you want to aggregate results from multiple parallel branches where some might fail.
Parameters
Name of the artifact in which to store the caught exception. If not specified, the exception is caught but not stored.
Whether to print the exception to stdout when caught. Set to False to suppress exception output.
Examples
Basic Exception Handling
@catch(var='error')
@step
def process(self):
self.result = risky_operation()
self.next(self.check_result)
@step
def check_result(self):
if self.error:
print("Processing failed, using default")
self.result = default_value()
print(f"Result: {self.result}")
self.next(self.end)
Silent Error Handling
@catch(var='error', print_exception=False)
@step
def optional_step(self):
# Exception won't be printed to logs
self.optional_data = fetch_optional_data()
self.next(self.end)
Multiple Parallel Steps with Catch
@step
def start(self):
self.urls = ['url1', 'url2', 'url3', 'url4']
self.next(self.fetch, foreach='urls')
@catch(var='fetch_error')
@step
def fetch(self):
import requests
# Some URLs might fail
self.data = requests.get(self.input).json()
self.next(self.join)
@step
def join(self, inputs):
# Aggregate results, skipping failed fetches
self.results = [
inp.data for inp in inputs
if not inp.fetch_error
]
print(f"Successfully fetched {len(self.results)} out of {len(inputs)}")
self.next(self.end)
With Retry
@catch(var='error')
@retry(times=3)
@step
def resilient_step(self):
# Try up to 3 times
# If all retries fail, catch the exception
self.result = flaky_operation()
self.next(self.end)
@step
def end(self):
if self.error:
print("All retries exhausted, handling gracefully")
else:
print(f"Success: {self.result}")
With Timeout
@catch(var='timeout_error')
@timeout(minutes=10)
@step
def timed_operation(self):
# If operation takes too long, catch the timeout
self.result = long_running_task()
self.next(self.end)
@step
def end(self):
if self.timeout_error:
print("Operation timed out")
self.result = use_cached_result()
print(f"Result: {self.result}")
Exception Object
The caught exception is stored as a MetaflowExceptionWrapper object with these properties:
@catch(var='error')
@step
def my_step(self):
raise ValueError("Something went wrong")
self.next(self.check_error)
@step
def check_error(self):
if self.error:
print(f"Exception type: {type(self.error.exception)}")
print(f"Exception message: {str(self.error.exception)}")
print(f"Traceback: {self.error.traceback}")
self.next(self.end)
Behavior
When @catch is present:
- On success: The exception variable (if specified) is set to
None, and the flow continues normally
- On failure: After all retries are exhausted:
- The exception is caught
- The exception is stored in the specified variable (if
var is provided)
- The exception is printed (if
print_exception=True)
self.next() is called automatically with the normal transition
- Downstream steps can check the exception variable
Best Practices
- Use for acceptable failures: Only use
@catch when it’s okay for the step to fail
- Always check the error: In downstream steps, check if the error variable is set
- Combine with foreach: Useful for parallel operations where some failures are expected
- Name error variables clearly: Use descriptive names like
download_error, api_error, etc.
- Consider retry first: Use
@retry before @catch to handle transient failures
Limitations
- Not supported on foreach split steps: Cannot use
@catch on steps that create foreach branches
- Not supported on switch steps: Cannot use
@catch on conditional branching steps
- Not supported with @parallel on Kubernetes: The combination of
@catch and @parallel is not supported on Kubernetes
Common Patterns
Optional Data Enrichment
@step
def process(self):
self.core_data = load_data()
self.next(self.enrich)
@catch(var='enrich_error', print_exception=False)
@step
def enrich(self):
# Try to enrich data, but continue if enrichment fails
self.enriched = call_external_api(self.core_data)
self.next(self.analyze)
@step
def analyze(self):
data = self.enriched if not self.enrich_error else self.core_data
self.analysis = analyze(data)
self.next(self.end)
Aggregating Partial Results
@step
def start(self):
self.tasks = range(100)
self.next(self.process, foreach='tasks')
@catch(var='error')
@retry(times=2)
@step
def process(self):
self.result = process_task(self.input)
self.next(self.join)
@step
def join(self, inputs):
# Separate successful and failed tasks
successful = [inp for inp in inputs if not inp.error]
failed = [inp for inp in inputs if inp.error]
self.success_count = len(successful)
self.failure_count = len(failed)
self.results = [inp.result for inp in successful]
print(f"Success: {self.success_count}, Failed: {self.failure_count}")
self.next(self.end)
Graceful Degradation
@catch(var='ml_error')
@step
def ml_prediction(self):
# Try to use ML model
self.prediction = ml_model.predict(self.data)
self.next(self.decide)
@step
def decide(self):
if self.ml_error:
# Fall back to rule-based system
print("ML model failed, using rule-based fallback")
self.prediction = rule_based_predict(self.data)
self.next(self.end)
Checking for Exceptions
You can check if an exception was caught:
@catch(var='error')
@step
def risky(self):
self.result = risky_operation()
self.next(self.check)
@step
def check(self):
# Check if exception was caught
if self.error is None:
print("Step succeeded")
else:
print(f"Step failed: {self.error}")
self.next(self.end)
See Also