Skip to main content

Overview

metaflow-dagster automatically translates Metaflow’s @retry and @timeout decorators into Dagster’s RetryPolicy and op_execution_timeout mechanisms. This ensures your step execution has the same resilience guarantees whether running through Metaflow or Dagster.

Retry Decorator

The @retry decorator on a Metaflow step automatically generates a Dagster RetryPolicy on the corresponding op.

Basic Retry Example

from metaflow import FlowSpec, step, retry

class RobustFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @step
    def process(self):
        # This step will retry up to 3 times with 2-minute delays
        result = self.call_external_api()
        self.data = result
        self.next(self.end)
    
    @step
    def end(self):
        pass

Generated Dagster Code

The compiled Dagster op includes the retry policy:
@op(retry_policy=RetryPolicy(max_retries=3, delay=120))
def op_process(context: OpExecutionContext, upstream: str) -> str:
    # Step execution code
    pass

How Retry Count is Forwarded

1

Dagster Tracks Retry Attempts

Dagster’s context.retry_number tracks the current retry attempt (0 for first try, 1 for first retry, etc.).
2

Passed to Metaflow

The retry count is forwarded to Metaflow via the --retry-count flag:
python my_flow.py step process \
  --run-id dagster-abc123 \
  --task-id 1 \
  --retry-count 1
3

Consistent Attempt Numbering

Metaflow sees the same attempt number as Dagster, ensuring consistent behavior for retry-aware code.
This coordination means if your Metaflow code checks current.retry_count, it will see the correct attempt number even when running through Dagster.

Timeout Decorator

The @timeout decorator sets a hard wall-clock limit on step execution.

Basic Timeout Example

from metaflow import FlowSpec, step, timeout

class TimeBoundFlow(FlowSpec):
    @timeout(seconds=300)
    @step
    def train(self):
        # This step must complete within 5 minutes
        self.model = train_model(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Generated Dagster Code

The timeout is translated to Dagster’s op_execution_timeout tag:
@op(tags={"dagster/op_execution_timeout": "300"})
def op_train(context: OpExecutionContext, upstream: str) -> str:
    # Step execution code
    pass

Timeout Units

The @timeout decorator accepts multiple time units:
@timeout(seconds=30)      # 30 seconds
@timeout(minutes=5)       # 5 minutes
@timeout(hours=2)         # 2 hours
@timeout(hours=1, minutes=30)  # 1.5 hours (90 minutes)
All units are converted to total seconds for Dagster.

Combining Retries and Timeouts

You can use both decorators together for robust, time-bound execution:
tests/flows/retry_flow.py
from metaflow import FlowSpec, step, retry, timeout, environment


class RetryFlow(FlowSpec):
    @step
    def start(self):
        self.value = 1
        self.next(self.process)

    @retry(times=3, minutes_between_retries=2)
    @timeout(seconds=120)
    @environment(vars={"MY_VAR": "hello", "OTHER": "world"})
    @step
    def process(self):
        self.result = self.value * 2
        self.next(self.end)

    @step
    def end(self):
        pass

Generated Code for Combined Decorators

@op(
    retry_policy=RetryPolicy(max_retries=3, delay=120),
    tags={"dagster/op_execution_timeout": "120"}
)
def op_process(context: OpExecutionContext, upstream: str) -> str:
    run_id = upstream.split("/")[0]
    task_path = _run_step(
        context, "process", run_id, upstream, "1",
        retry_count=context.retry_number,
        max_user_code_retries=3,
        tags=[],
        extra_env={"MY_VAR": "hello", "OTHER": "world"},
    )
    return task_path

Retry Behavior Details

Delay Between Retries

If minutes_between_retries is specified, Dagster waits that duration before the next attempt:
@retry(times=5, minutes_between_retries=1)
Generates:
RetryPolicy(max_retries=5, delay=60)
If no delay is specified, retries happen immediately:
@retry(times=3)  # No delay
Generates:
RetryPolicy(max_retries=3)

Max User Code Retries

The max_user_code_retries parameter is passed to Metaflow, allowing it to handle retries at the task level if needed:
task_path = _run_step(
    context, "process", run_id, upstream, "1",
    retry_count=context.retry_number,
    max_user_code_retries=3,  # Matches retry(times=3)
    tags=[],
)

Timeout Behavior Details

Process Termination

When the timeout is reached, Dagster sends SIGTERM to the Metaflow step subprocess, then waits up to 10 seconds before sending SIGKILL:
def _communicate(proc: subprocess.Popen) -> tuple:
    """Wait for proc to finish, killing it cleanly on interruption."""
    try:
        return proc.communicate()
    except BaseException:
        proc.terminate()  # SIGTERM
        try:
            proc.wait(timeout=10)
        except subprocess.TimeoutExpired:
            proc.kill()  # SIGKILL
            proc.wait()
        raise
This gives Metaflow a chance to clean up resources before forced termination.

Timeout Inheritance

If you set a global workflow timeout, individual step timeouts are still respected:
python my_flow.py dagster create my_flow_dagster.py --workflow-timeout 3600
Steps with @timeout(seconds=300) will still timeout after 5 minutes, even if the workflow timeout is 1 hour.

Real-World Example

A production-grade ML training flow:
from metaflow import FlowSpec, step, retry, timeout, resources

class MLTrainingFlow(FlowSpec):
    @step
    def start(self):
        self.dataset_url = "s3://my-bucket/data.parquet"
        self.next(self.load_data)
    
    @retry(times=3, minutes_between_retries=1)
    @timeout(minutes=10)
    @step
    def load_data(self):
        # Network calls can fail; retry up to 3 times
        import pandas as pd
        self.data = pd.read_parquet(self.dataset_url)
        self.next(self.train)
    
    @retry(times=2, minutes_between_retries=5)
    @timeout(hours=2)
    @resources(cpu=16, memory=32000, gpu=4)
    @step
    def train(self):
        # Training can fail due to OOM or CUDA errors; retry once
        # Must complete within 2 hours
        self.model = train_large_model(self.data)
        self.next(self.save_model)
    
    @retry(times=3, minutes_between_retries=1)
    @timeout(minutes=5)
    @step
    def save_model(self):
        # S3 uploads can be flaky; retry multiple times
        upload_to_s3(self.model, "s3://models/latest.pkl")
        self.next(self.end)
    
    @step
    def end(self):
        print("Training complete!")
Each step has tailored retry and timeout settings based on its failure modes and expected duration.

Best Practices

Set realistic timeouts: Use timeouts that are 2-3x your expected execution time to account for system variance.
Retry transient failures: Use retries for network calls, external API requests, and resource allocation failures. Don’t retry logical errors in your code.
Retrying a step re-runs the entire step, including any expensive initialization. Consider moving setup code outside retry-sensitive sections if initialization is costly.
Retry delays use wall-clock time, not CPU time. A minutes_between_retries=2 delay means 2 minutes of real time, not 2 minutes of compute.

Troubleshooting

Step Keeps Timing Out

  1. Check the Dagster run logs for the actual execution time
  2. Increase the timeout value in your flow code
  3. Re-run dagster create to regenerate the definitions file

Retries Not Working

  1. Verify the @retry decorator is present in your flow code
  2. Check that dagster create was run after adding the decorator
  3. Examine the generated file to confirm retry_policy=RetryPolicy(...) is present on the op

Retry Count Mismatch

If Metaflow’s current.retry_count doesn’t match expectations:
  1. Ensure you’re using Dagster’s execution (not running Metaflow directly)
  2. Check that context.retry_number is being passed correctly in the generated _run_step calls