Decorators enhance the behavior of flows and steps. Metaflow provides decorators for resource allocation, error handling, execution environments, and more.
Decorator Types
Metaflow has two types of decorators:
Step Decorators
Applied to individual steps to modify their execution:
@batch(cpu=8, memory=32000)
@retry(times=3)
@step
def train_model(self):
pass
Flow Decorators
Applied to the entire flow class:
@project(name='recommendation_system')
@schedule(weekly=True)
class MyFlow(FlowSpec):
pass
Decorator Placement
Critical: The @step decorator must be placed immediately before the method definition. All other decorators go above @step:# ✅ Correct
@batch
@retry
@step
def my_step(self):
pass
# ❌ Wrong - will fail
@step
@batch
def my_step(self):
pass
Common Step Decorators
@batch - AWS Batch Execution
Run steps on AWS Batch for scalable compute:
@batch(cpu=8, memory=32000, image='my-docker-image:latest')
@step
def train_model(self):
# Runs on AWS Batch with specified resources
pass
Key attributes:
cpu: Number of CPUs (default: 1)
memory: Memory in MB (default: 4096)
gpu: Number of GPUs (default: 0)
image: Docker image to use
queue: Batch queue name
@retry - Error Handling
Automatically retry steps on failure:
@retry(times=3)
@step
def flaky_api_call(self):
# Retries up to 3 times on failure
response = requests.get('https://api.example.com/data')
self.data = response.json()
self.next(self.process)
Key attributes:
times: Number of retry attempts (default: 0)
@timeout - Time Limits
Set maximum execution time:
@timeout(hours=2)
@step
def long_task(self):
# Fails if execution exceeds 2 hours
pass
Key attributes:
seconds: Timeout in seconds
minutes: Timeout in minutes
hours: Timeout in hours
@resources - Resource Allocation
Specify CPU and memory requirements:
@resources(cpu=16, memory=64000)
@step
def memory_intensive(self):
# Requests 16 CPUs and 64GB RAM
pass
@environment - Python Environments
Specify package dependencies:
@conda(libraries={'scikit-learn': '1.0.2', 'pandas': '1.3.0'})
@step
def analyze(self):
import sklearn
import pandas as pd
# Uses specified package versions
pass
@catch - Exception Handling
Handle exceptions without failing the flow:
@catch(var='exception_info')
@step
def risky_operation(self):
# If this fails, exception is stored in self.exception_info
# and flow continues
result = 1 / 0
self.next(self.end)
Flow Decorators
@project - Project Organization
Organize flows into projects:
@project(name='recommendation_system')
class TrainingFlow(FlowSpec):
pass
@schedule - Production Scheduling
Schedule flows for production:
@schedule(daily=True, timezone='America/Los_Angeles')
class DailyReportFlow(FlowSpec):
pass
Scheduling options:
daily: Run once per day
weekly: Run once per week
hourly: Run once per hour
cron: Cron expression for custom schedules
Custom Decorators
You can create custom decorators for steps:
from metaflow import UserStepDecorator
class MyDecorator(UserStepDecorator):
name = 'my_decorator'
defaults = {'param': 'default_value'}
def task_pre_step(self, step_name, task_datastore, meta,
run_id, task_id, flow, graph, retry_count,
max_user_code_retries, ubf_context, inputs):
print(f"Before step {step_name}")
def task_post_step(self, step_name, flow, graph,
retry_count, max_user_code_retries):
print(f"After step {step_name}")
# Use it
@my_decorator(param='custom_value')
@step
def my_step(self):
pass
Decorator Attributes
Access decorator configuration:
from metaflow import decorators
class MyFlow(FlowSpec):
@batch(cpu=8)
@step
def start(self):
# Access decorator attributes
for deco in self.start.decorators:
print(f"Decorator: {deco.name}")
print(f"Attributes: {deco.attributes}")
self.next(self.end)
Multiple Decorators
Stack multiple decorators on a step:
@batch(cpu=8, memory=32000)
@retry(times=3)
@timeout(hours=2)
@conda(libraries={'tensorflow': '2.10.0'})
@step
def train_model(self):
# Combines all decorator behaviors
pass
Decorators are applied from bottom to top (step decorator first).
Decorator Lifecycle
Decorators hook into various points in step execution:
Called when decorators are initialized (before run starts).
Called when a task is created.
Called before the step function runs.
Called after successful step execution.
Called if the step raises an exception.
Called after task finalization.
Base Decorator Classes
StepDecorator
Base class for all step decorators:
class StepDecorator(Decorator):
name = "NONAME"
defaults = {}
allow_multiple = False
FlowDecorator
Base class for flow decorators:
class FlowDecorator(Decorator):
options = {}
Best Practices
Start simple: Use built-in decorators before creating custom ones. They handle most common use cases.
Order matters: Place resource decorators (@batch, @resources) above environment decorators (@conda, @pypi) for best results.
Test locally: Many decorators have a local execution mode. Test your flow locally before running on remote compute.
Some decorators are mutually exclusive. For example, you can’t use both @batch and @kubernetes on the same step.
- Steps - Learn about step functions
- Flows - Understanding flow structure