Overview
metaflow-dagster automatically translates Metaflow’s @retry and @timeout decorators into Dagster’s RetryPolicy and op_execution_timeout mechanisms. This ensures your step execution has the same resilience guarantees whether running through Metaflow or Dagster.
Retry Decorator
The @retry decorator on a Metaflow step automatically generates a Dagster RetryPolicy on the corresponding op.
Basic Retry Example
from metaflow import FlowSpec, step, retry
class RobustFlow(FlowSpec):
@retry(times=3, minutes_between_retries=2)
@step
def process(self):
# This step will retry up to 3 times with 2-minute delays
result = self.call_external_api()
self.data = result
self.next(self.end)
@step
def end(self):
pass
Generated Dagster Code
The compiled Dagster op includes the retry policy:
@op(retry_policy=RetryPolicy(max_retries=3, delay=120))
def op_process(context: OpExecutionContext, upstream: str) -> str:
# Step execution code
pass
How Retry Count is Forwarded
Dagster Tracks Retry Attempts
Dagster’s context.retry_number tracks the current retry attempt (0 for first try, 1 for first retry, etc.).
Passed to Metaflow
The retry count is forwarded to Metaflow via the --retry-count flag:python my_flow.py step process \
--run-id dagster-abc123 \
--task-id 1 \
--retry-count 1
Consistent Attempt Numbering
Metaflow sees the same attempt number as Dagster, ensuring consistent behavior for retry-aware code.
This coordination means if your Metaflow code checks current.retry_count, it will see the correct attempt number even when running through Dagster.
Timeout Decorator
The @timeout decorator sets a hard wall-clock limit on step execution.
Basic Timeout Example
from metaflow import FlowSpec, step, timeout
class TimeBoundFlow(FlowSpec):
@timeout(seconds=300)
@step
def train(self):
# This step must complete within 5 minutes
self.model = train_model(self.data)
self.next(self.end)
@step
def end(self):
pass
Generated Dagster Code
The timeout is translated to Dagster’s op_execution_timeout tag:
@op(tags={"dagster/op_execution_timeout": "300"})
def op_train(context: OpExecutionContext, upstream: str) -> str:
# Step execution code
pass
Timeout Units
The @timeout decorator accepts multiple time units:
@timeout(seconds=30) # 30 seconds
@timeout(minutes=5) # 5 minutes
@timeout(hours=2) # 2 hours
@timeout(hours=1, minutes=30) # 1.5 hours (90 minutes)
All units are converted to total seconds for Dagster.
Combining Retries and Timeouts
You can use both decorators together for robust, time-bound execution:
tests/flows/retry_flow.py
from metaflow import FlowSpec, step, retry, timeout, environment
class RetryFlow(FlowSpec):
@step
def start(self):
self.value = 1
self.next(self.process)
@retry(times=3, minutes_between_retries=2)
@timeout(seconds=120)
@environment(vars={"MY_VAR": "hello", "OTHER": "world"})
@step
def process(self):
self.result = self.value * 2
self.next(self.end)
@step
def end(self):
pass
Generated Code for Combined Decorators
@op(
retry_policy=RetryPolicy(max_retries=3, delay=120),
tags={"dagster/op_execution_timeout": "120"}
)
def op_process(context: OpExecutionContext, upstream: str) -> str:
run_id = upstream.split("/")[0]
task_path = _run_step(
context, "process", run_id, upstream, "1",
retry_count=context.retry_number,
max_user_code_retries=3,
tags=[],
extra_env={"MY_VAR": "hello", "OTHER": "world"},
)
return task_path
Retry Behavior Details
Delay Between Retries
If minutes_between_retries is specified, Dagster waits that duration before the next attempt:
@retry(times=5, minutes_between_retries=1)
Generates:
RetryPolicy(max_retries=5, delay=60)
If no delay is specified, retries happen immediately:
@retry(times=3) # No delay
Generates:
RetryPolicy(max_retries=3)
Max User Code Retries
The max_user_code_retries parameter is passed to Metaflow, allowing it to handle retries at the task level if needed:
task_path = _run_step(
context, "process", run_id, upstream, "1",
retry_count=context.retry_number,
max_user_code_retries=3, # Matches retry(times=3)
tags=[],
)
Timeout Behavior Details
Process Termination
When the timeout is reached, Dagster sends SIGTERM to the Metaflow step subprocess, then waits up to 10 seconds before sending SIGKILL:
def _communicate(proc: subprocess.Popen) -> tuple:
"""Wait for proc to finish, killing it cleanly on interruption."""
try:
return proc.communicate()
except BaseException:
proc.terminate() # SIGTERM
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill() # SIGKILL
proc.wait()
raise
This gives Metaflow a chance to clean up resources before forced termination.
Timeout Inheritance
If you set a global workflow timeout, individual step timeouts are still respected:
python my_flow.py dagster create my_flow_dagster.py --workflow-timeout 3600
Steps with @timeout(seconds=300) will still timeout after 5 minutes, even if the workflow timeout is 1 hour.
Real-World Example
A production-grade ML training flow:
from metaflow import FlowSpec, step, retry, timeout, resources
class MLTrainingFlow(FlowSpec):
@step
def start(self):
self.dataset_url = "s3://my-bucket/data.parquet"
self.next(self.load_data)
@retry(times=3, minutes_between_retries=1)
@timeout(minutes=10)
@step
def load_data(self):
# Network calls can fail; retry up to 3 times
import pandas as pd
self.data = pd.read_parquet(self.dataset_url)
self.next(self.train)
@retry(times=2, minutes_between_retries=5)
@timeout(hours=2)
@resources(cpu=16, memory=32000, gpu=4)
@step
def train(self):
# Training can fail due to OOM or CUDA errors; retry once
# Must complete within 2 hours
self.model = train_large_model(self.data)
self.next(self.save_model)
@retry(times=3, minutes_between_retries=1)
@timeout(minutes=5)
@step
def save_model(self):
# S3 uploads can be flaky; retry multiple times
upload_to_s3(self.model, "s3://models/latest.pkl")
self.next(self.end)
@step
def end(self):
print("Training complete!")
Each step has tailored retry and timeout settings based on its failure modes and expected duration.
Best Practices
Set realistic timeouts: Use timeouts that are 2-3x your expected execution time to account for system variance.
Retry transient failures: Use retries for network calls, external API requests, and resource allocation failures. Don’t retry logical errors in your code.
Retrying a step re-runs the entire step, including any expensive initialization. Consider moving setup code outside retry-sensitive sections if initialization is costly.
Retry delays use wall-clock time, not CPU time. A minutes_between_retries=2 delay means 2 minutes of real time, not 2 minutes of compute.
Troubleshooting
Step Keeps Timing Out
- Check the Dagster run logs for the actual execution time
- Increase the timeout value in your flow code
- Re-run
dagster create to regenerate the definitions file
Retries Not Working
- Verify the
@retry decorator is present in your flow code
- Check that
dagster create was run after adding the decorator
- Examine the generated file to confirm
retry_policy=RetryPolicy(...) is present on the op
Retry Count Mismatch
If Metaflow’s current.retry_count doesn’t match expectations:
- Ensure you’re using Dagster’s execution (not running Metaflow directly)
- Check that
context.retry_number is being passed correctly in the generated _run_step calls