Overview
The @trigger decorator makes a flow event-driven, allowing it to automatically execute when specific events occur. This enables reactive workflows and flow chaining.
This is a flow-level decorator for creating event-driven architectures. Supported by AWS Step Functions (with EventBridge) and Argo Workflows (with Argo Events).
Basic Usage
from metaflow import FlowSpec, step, trigger
@trigger ( event = 'data-ready' )
class ProcessingFlow ( FlowSpec ):
@step
def start ( self ):
from metaflow import current
# Access trigger information
if current.trigger:
print ( f "Triggered by: { current.trigger.event_name } " )
print ( f "Payload: { current.trigger.payload } " )
self .next( self .end)
@step
def end ( self ):
pass
Parameters
event
Union[str, dict, Callable]
Single event dependency. Can be:
String: Event name
Dict: Event specification with parameter mappings
Callable: Function returning event specification at deploy time
events
List[Union[str, dict, Callable]]
default: "[]"
Multiple event dependencies. Each element can be a string, dict, or callable.
Backend-specific configuration for tuning eventing behavior.
Event Specifications
Simple Event Name
@trigger ( event = 'user-signup' )
class WelcomeEmailFlow ( FlowSpec ):
pass
Multiple Events
@trigger ( events = [ 'data-updated' , 'manual-trigger' , 'scheduled-run' ])
class ProcessingFlow ( FlowSpec ):
pass
Event with Parameter Mapping
Map event payload fields to flow parameters:
from metaflow import FlowSpec, Parameter, step, trigger
@trigger ( event = {
'name' : 'user-signup' ,
'parameters' : {
'user_id' : 'userId' , # flow_param: event_field
'email' : 'userEmail'
}
})
class UserOnboarding ( FlowSpec ):
user_id = Parameter( 'user_id' , required = True )
email = Parameter( 'email' , required = True )
@step
def start ( self ):
print ( f "Onboarding user { self .user_id } ( { self .email } )" )
self .next( self .end)
@step
def end ( self ):
pass
Short Parameter Syntax
@trigger ( event = {
'name' : 'data-ready' ,
'parameters' : [
'dataset_id' , # Same name in event and flow
( 'flow_param' , 'event_field' ) # Different names
]
})
class DataProcessor ( FlowSpec ):
dataset_id = Parameter( 'dataset_id' )
flow_param = Parameter( 'flow_param' )
Deploy-Time Event Resolution
Use namespaced_event_name for project-aware event names:
from metaflow import FlowSpec, step, trigger, project
from metaflow.plugins.namespaced_events import namespaced_event_name
@project ( name = 'ml-platform' )
@trigger ( event = namespaced_event_name( 'model-trained' ))
class DeployModel ( FlowSpec ):
# Event name resolves to: ml-platform.production.model-trained
pass
Dynamic Event Configuration
from metaflow import ParameterContext
def get_trigger_config ( ctx : ParameterContext) -> dict :
# Access deploy-time context
if ctx.user == 'prod-deployer' :
return {
'name' : 'prod-event' ,
'parameters' : { 'env' : 'production' }
}
else :
return 'dev-event'
@trigger ( event = get_trigger_config)
class AdaptiveFlow ( FlowSpec ):
pass
Use current.trigger to access trigger details at runtime:
from metaflow import FlowSpec, step, trigger, current
@trigger ( event = 'data-updated' )
class EventProcessor ( FlowSpec ):
@step
def start ( self ):
if current.trigger:
# Event that triggered this run
event_name = current.trigger.event_name
# Full event payload
payload = current.trigger.payload
# Timestamp
timestamp = current.trigger.timestamp
print ( f "Triggered by { event_name } at { timestamp } " )
print ( f "Payload: { payload } " )
else :
print ( "Not triggered by an event (manual run)" )
self .next( self .end)
@step
def end ( self ):
pass
Flow Chaining with @trigger_on_finish
Create flow dependencies using @trigger_on_finish:
from metaflow import FlowSpec, step, trigger_on_finish
# Upstream flow
class DataIngestion ( FlowSpec ):
@step
def start ( self ):
self .data = fetch_data()
self .next( self .end)
@step
def end ( self ):
pass
# Downstream flow - runs when DataIngestion completes
@trigger_on_finish ( flow = 'DataIngestion' )
class DataTransform ( FlowSpec ):
@step
def start ( self ):
from metaflow import Flow
# Access upstream run
upstream_run = Flow( 'DataIngestion' ).latest_successful_run
self .data = upstream_run[ 'end' ].task.data.data
self .next( self .end)
@step
def end ( self ):
pass
Complex Flow Dependencies
# Run when either FlowA or FlowB finishes
@trigger_on_finish ( flows = [ 'FlowA' , 'FlowB' ])
class Aggregator ( FlowSpec ):
pass
# Run only when FlowA completes successfully
@trigger_on_finish ( flow = 'FlowA' , match = 'success' )
class SuccessHandler ( FlowSpec ):
pass
Examples
Data Pipeline
# Step 1: Ingest
class DataIngest ( FlowSpec ):
@step
def start ( self ):
self .raw_data = ingest()
self .next( self .end)
@step
def end ( self ):
pass
# Step 2: Process (triggered by ingest)
@trigger_on_finish ( flow = 'DataIngest' )
class DataProcess ( FlowSpec ):
@step
def start ( self ):
upstream = Flow( 'DataIngest' ).latest_run
self .processed = process(upstream.data.raw_data)
self .next( self .end)
@step
def end ( self ):
pass
# Step 3: Analyze (triggered by process)
@trigger_on_finish ( flow = 'DataProcess' )
class DataAnalyze ( FlowSpec ):
@step
def start ( self ):
upstream = Flow( 'DataProcess' ).latest_run
analyze(upstream.data.processed)
self .next( self .end)
@step
def end ( self ):
pass
Fan-Out Processing
# Main flow emits events
class MainPipeline ( FlowSpec ):
@step
def start ( self ):
# Emit custom event
emit_event( 'pipeline-complete' , {
'dataset' : 'users' ,
'records' : 1000
})
self .next( self .end)
@step
def end ( self ):
pass
# Multiple flows react to the event
@trigger ( event = {
'name' : 'pipeline-complete' ,
'parameters' : { 'dataset' : 'dataset' }
})
class EmailReport ( FlowSpec ):
dataset = Parameter( 'dataset' )
@step
def start ( self ):
send_email_report( self .dataset)
self .next( self .end)
@step
def end ( self ):
pass
@trigger ( event = {
'name' : 'pipeline-complete' ,
'parameters' : { 'records' : 'records' }
})
class UpdateMetrics ( FlowSpec ):
records = Parameter( 'records' )
@step
def start ( self ):
update_dashboard( self .records)
self .next( self .end)
@step
def end ( self ):
pass
Deployment
AWS Step Functions with EventBridge
# Deploy trigger-based flow
python flow.py step-functions create
# EventBridge rules are automatically configured
Argo Workflows with Argo Events
# Deploy trigger-based flow
python flow.py argo-workflows create
# EventSource and Sensor are created automatically
Best Practices
Handle missing triggers gracefully
Always check if current.trigger exists: @step
def start ( self ):
if current.trigger:
data = current.trigger.payload
else :
# Handle manual runs
data = get_default_data()
Map event fields to parameters for type safety: @trigger ( event = {
'name' : 'order-placed' ,
'parameters' : {
'order_id' : 'orderId' ,
'amount' : 'totalAmount'
}
})
class ProcessOrder ( FlowSpec ):
order_id = Parameter( 'order_id' , type = str )
amount = Parameter( 'amount' , type = float )
Test event-driven flows locally
Test flows manually without events: # Run without trigger
python flow.py run --order_id=123 --amount=99.99
Use namespaced events for projects
Combine with @project for organized event namespaces: @project ( name = 'ml-platform' )
@trigger ( event = namespaced_event_name( 'training-complete' ))
Event Triggering Guide Complete guide to event-driven workflows
@project Decorator Project namespacing for events
AWS Step Functions Deploy event-driven flows to AWS
Argo Workflows Deploy event-driven flows to Argo