Skip to main content
The @timeout decorator specifies a maximum execution time for a step. If the step runs longer than the specified timeout, it will be terminated.

Basic Usage

from metaflow import FlowSpec, step, timeout

class MyFlow(FlowSpec):
    @timeout(hours=2)
    @step
    def long_running_task(self):
        # This step will timeout after 2 hours
        import time
        for i in range(100):
            time.sleep(100)
        self.next(self.end)

if __name__ == '__main__':
    MyFlow()

Description

The @timeout decorator is useful for preventing steps from hanging indefinitely. This is especially important for:
  • Steps that might enter infinite loops
  • External API calls that might not respond
  • Long-running computations that need bounded execution time
  • Expensive cloud compute resources that need cost control
When a timeout occurs, a TimeoutException is raised, which can be caught by the @catch decorator.

Parameters

All time parameters are additive. For example, @timeout(hours=1, minutes=30, seconds=45) sets a timeout of 1 hour, 30 minutes, and 45 seconds.
seconds
int
default:"0"
Number of seconds to wait before timing out.
minutes
int
default:"0"
Number of minutes to wait before timing out.
hours
int
default:"0"
Number of hours to wait before timing out.

Examples

Basic Timeout

@timeout(minutes=30)
@step
def process_data(self):
    # Must complete within 30 minutes
    self.result = expensive_computation()
    self.next(self.end)

Combined Time Units

@timeout(hours=2, minutes=30, seconds=15)
@step
def long_task(self):
    # Timeout after 2 hours, 30 minutes, and 15 seconds
    pass

With Retry

@retry(times=3)
@timeout(minutes=10)
@step
def api_call(self):
    # Each attempt times out after 10 minutes
    # Will retry up to 3 times if it times out
    import requests
    response = requests.get('https://slow-api.example.com')
    self.data = response.json()
    self.next(self.process)

With Catch

@catch(var='timeout_error')
@timeout(hours=1)
@step
def graceful_timeout(self):
    # If this times out, continue with fallback
    self.result = slow_computation()
    self.next(self.end)

@step
def end(self):
    if self.timeout_error:
        print("Step timed out, using default values")
        self.result = None
    else:
        print(f"Result: {self.result}")

Cloud Execution

@timeout(hours=2)
@batch(cpu=8, memory=32768)
@step
def batch_with_timeout(self):
    # Prevents runaway costs on cloud compute
    # AWS Batch job will terminate after 2 hours
    pass

Behavior

When a timeout occurs:
  1. A SIGALRM signal is sent to the process
  2. A TimeoutException is raised with:
    • The step name
    • The timeout duration
    • A stack trace showing where the code was when it timed out
  3. The exception can be caught by @catch or cause the step to fail
  4. If combined with @retry, the step will be retried

Timeout Stack Trace

When a timeout occurs, you’ll see where your code was executing:
Step my_step timed out after 0 hours, 5 minutes, 0 seconds
Stack when the timeout was raised:
>  File "myflow.py", line 42, in my_step
>    result = slow_function()
>  File "module.py", line 100, in slow_function
>    time.sleep(1000)

Best Practices

  1. Always set timeouts for cloud execution: Prevent runaway costs on @batch or @kubernetes steps
  2. Be generous: Set timeouts higher than expected runtime to account for variability
  3. Consider retries: When using @retry, remember that each retry gets the full timeout
  4. Use with catch: Combine with @catch to gracefully handle timeouts
  5. Test timeout values: Monitor actual execution times and adjust accordingly

Common Patterns

Defensive Cloud Execution

@catch(var='error')
@retry(times=2)
@timeout(hours=1)
@batch(cpu=4, memory=16384)
@step
def safe_cloud_task(self):
    # Multiple layers of protection
    # - Times out after 1 hour
    # - Retries up to 2 times on failure/timeout
    # - Catches final failure gracefully
    self.result = complex_computation()
    self.next(self.end)

API Calls with Timeout

@timeout(seconds=30)
@step
def quick_api_call(self):
    import requests
    # Timeout if API doesn't respond in 30 seconds
    response = requests.get(url, timeout=25)  # Also set requests timeout
    self.data = response.json()
    self.next(self.process)

Long-Running Job with Checkpoints

from metaflow import current

@timeout(hours=10)
@step
def incremental_processing(self):
    # Save progress periodically
    for i in range(1000):
        result = process_batch(i)
        if i % 100 == 0:
            # Checkpoint progress
            save_intermediate_result(result)
    self.next(self.end)

Combining with Other Decorators

Decorator order matters - @timeout should typically be near the top:
@catch(var='error')      # 1. Outermost error handling
@retry(times=3)          # 2. Retry logic
@timeout(hours=1)        # 3. Timeout
@batch(cpu=4)            # 4. Execution environment
@step                    # 5. Always last
def my_step(self):
    pass

Timeout vs. External Timeouts

When calling external services, set both Metaflow timeout and library-specific timeouts:
@timeout(minutes=5)
@step
def api_call(self):
    import requests
    # requests timeout should be shorter than @timeout
    response = requests.get(url, timeout=240)  # 4 minutes
    self.data = response.json()
    self.next(self.end)

Minimum Timeout for Cloud Execution

  • AWS Batch: Minimum 60 seconds
  • Kubernetes: Minimum 60 seconds

See Also

Build docs developers (and LLMs) love