The @timeout decorator specifies a maximum execution time for a step. If the step runs longer than the specified timeout, it will be terminated.
Basic Usage
from metaflow import FlowSpec, step, timeout
class MyFlow(FlowSpec):
@timeout(hours=2)
@step
def long_running_task(self):
# This step will timeout after 2 hours
import time
for i in range(100):
time.sleep(100)
self.next(self.end)
if __name__ == '__main__':
MyFlow()
Description
The @timeout decorator is useful for preventing steps from hanging indefinitely. This is especially important for:
- Steps that might enter infinite loops
- External API calls that might not respond
- Long-running computations that need bounded execution time
- Expensive cloud compute resources that need cost control
When a timeout occurs, a TimeoutException is raised, which can be caught by the @catch decorator.
Parameters
All time parameters are additive. For example, @timeout(hours=1, minutes=30, seconds=45) sets a timeout of 1 hour, 30 minutes, and 45 seconds.
Number of seconds to wait before timing out.
Number of minutes to wait before timing out.
Number of hours to wait before timing out.
Examples
Basic Timeout
@timeout(minutes=30)
@step
def process_data(self):
# Must complete within 30 minutes
self.result = expensive_computation()
self.next(self.end)
Combined Time Units
@timeout(hours=2, minutes=30, seconds=15)
@step
def long_task(self):
# Timeout after 2 hours, 30 minutes, and 15 seconds
pass
With Retry
@retry(times=3)
@timeout(minutes=10)
@step
def api_call(self):
# Each attempt times out after 10 minutes
# Will retry up to 3 times if it times out
import requests
response = requests.get('https://slow-api.example.com')
self.data = response.json()
self.next(self.process)
With Catch
@catch(var='timeout_error')
@timeout(hours=1)
@step
def graceful_timeout(self):
# If this times out, continue with fallback
self.result = slow_computation()
self.next(self.end)
@step
def end(self):
if self.timeout_error:
print("Step timed out, using default values")
self.result = None
else:
print(f"Result: {self.result}")
Cloud Execution
@timeout(hours=2)
@batch(cpu=8, memory=32768)
@step
def batch_with_timeout(self):
# Prevents runaway costs on cloud compute
# AWS Batch job will terminate after 2 hours
pass
Behavior
When a timeout occurs:
- A
SIGALRM signal is sent to the process
- A
TimeoutException is raised with:
- The step name
- The timeout duration
- A stack trace showing where the code was when it timed out
- The exception can be caught by
@catch or cause the step to fail
- If combined with
@retry, the step will be retried
Timeout Stack Trace
When a timeout occurs, you’ll see where your code was executing:
Step my_step timed out after 0 hours, 5 minutes, 0 seconds
Stack when the timeout was raised:
> File "myflow.py", line 42, in my_step
> result = slow_function()
> File "module.py", line 100, in slow_function
> time.sleep(1000)
Best Practices
- Always set timeouts for cloud execution: Prevent runaway costs on
@batch or @kubernetes steps
- Be generous: Set timeouts higher than expected runtime to account for variability
- Consider retries: When using
@retry, remember that each retry gets the full timeout
- Use with catch: Combine with
@catch to gracefully handle timeouts
- Test timeout values: Monitor actual execution times and adjust accordingly
Common Patterns
Defensive Cloud Execution
@catch(var='error')
@retry(times=2)
@timeout(hours=1)
@batch(cpu=4, memory=16384)
@step
def safe_cloud_task(self):
# Multiple layers of protection
# - Times out after 1 hour
# - Retries up to 2 times on failure/timeout
# - Catches final failure gracefully
self.result = complex_computation()
self.next(self.end)
API Calls with Timeout
@timeout(seconds=30)
@step
def quick_api_call(self):
import requests
# Timeout if API doesn't respond in 30 seconds
response = requests.get(url, timeout=25) # Also set requests timeout
self.data = response.json()
self.next(self.process)
Long-Running Job with Checkpoints
from metaflow import current
@timeout(hours=10)
@step
def incremental_processing(self):
# Save progress periodically
for i in range(1000):
result = process_batch(i)
if i % 100 == 0:
# Checkpoint progress
save_intermediate_result(result)
self.next(self.end)
Combining with Other Decorators
Decorator order matters - @timeout should typically be near the top:
@catch(var='error') # 1. Outermost error handling
@retry(times=3) # 2. Retry logic
@timeout(hours=1) # 3. Timeout
@batch(cpu=4) # 4. Execution environment
@step # 5. Always last
def my_step(self):
pass
Timeout vs. External Timeouts
When calling external services, set both Metaflow timeout and library-specific timeouts:
@timeout(minutes=5)
@step
def api_call(self):
import requests
# requests timeout should be shorter than @timeout
response = requests.get(url, timeout=240) # 4 minutes
self.data = response.json()
self.next(self.end)
Minimum Timeout for Cloud Execution
- AWS Batch: Minimum 60 seconds
- Kubernetes: Minimum 60 seconds
See Also