Skip to main content

Overview

The @trigger decorator makes a flow event-driven, allowing it to automatically execute when specific events occur. This enables reactive workflows and flow chaining.
This is a flow-level decorator for creating event-driven architectures. Supported by AWS Step Functions (with EventBridge) and Argo Workflows (with Argo Events).

Basic Usage

from metaflow import FlowSpec, step, trigger

@trigger(event='data-ready')
class ProcessingFlow(FlowSpec):
    
    @step
    def start(self):
        from metaflow import current
        
        # Access trigger information
        if current.trigger:
            print(f"Triggered by: {current.trigger.event_name}")
            print(f"Payload: {current.trigger.payload}")
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Parameters

event
Union[str, dict, Callable]
Single event dependency. Can be:
  • String: Event name
  • Dict: Event specification with parameter mappings
  • Callable: Function returning event specification at deploy time
events
List[Union[str, dict, Callable]]
default:"[]"
Multiple event dependencies. Each element can be a string, dict, or callable.
options
dict
default:"{}"
Backend-specific configuration for tuning eventing behavior.

Event Specifications

Simple Event Name

@trigger(event='user-signup')
class WelcomeEmailFlow(FlowSpec):
    pass

Multiple Events

@trigger(events=['data-updated', 'manual-trigger', 'scheduled-run'])
class ProcessingFlow(FlowSpec):
    pass

Event with Parameter Mapping

Map event payload fields to flow parameters:
from metaflow import FlowSpec, Parameter, step, trigger

@trigger(event={
    'name': 'user-signup',
    'parameters': {
        'user_id': 'userId',      # flow_param: event_field
        'email': 'userEmail'
    }
})
class UserOnboarding(FlowSpec):
    user_id = Parameter('user_id', required=True)
    email = Parameter('email', required=True)
    
    @step
    def start(self):
        print(f"Onboarding user {self.user_id} ({self.email})")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Short Parameter Syntax

@trigger(event={
    'name': 'data-ready',
    'parameters': [
        'dataset_id',  # Same name in event and flow
        ('flow_param', 'event_field')  # Different names
    ]
})
class DataProcessor(FlowSpec):
    dataset_id = Parameter('dataset_id')
    flow_param = Parameter('flow_param')

Deploy-Time Event Resolution

Use namespaced_event_name for project-aware event names:
from metaflow import FlowSpec, step, trigger, project
from metaflow.plugins.namespaced_events import namespaced_event_name

@project(name='ml-platform')
@trigger(event=namespaced_event_name('model-trained'))
class DeployModel(FlowSpec):
    # Event name resolves to: ml-platform.production.model-trained
    pass

Dynamic Event Configuration

from metaflow import ParameterContext

def get_trigger_config(ctx: ParameterContext) -> dict:
    # Access deploy-time context
    if ctx.user == 'prod-deployer':
        return {
            'name': 'prod-event',
            'parameters': {'env': 'production'}
        }
    else:
        return 'dev-event'

@trigger(event=get_trigger_config)
class AdaptiveFlow(FlowSpec):
    pass

Accessing Trigger Information

Use current.trigger to access trigger details at runtime:
from metaflow import FlowSpec, step, trigger, current

@trigger(event='data-updated')
class EventProcessor(FlowSpec):
    
    @step
    def start(self):
        if current.trigger:
            # Event that triggered this run
            event_name = current.trigger.event_name
            
            # Full event payload
            payload = current.trigger.payload
            
            # Timestamp
            timestamp = current.trigger.timestamp
            
            print(f"Triggered by {event_name} at {timestamp}")
            print(f"Payload: {payload}")
        else:
            print("Not triggered by an event (manual run)")
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Flow Chaining with @trigger_on_finish

Create flow dependencies using @trigger_on_finish:
from metaflow import FlowSpec, step, trigger_on_finish

# Upstream flow
class DataIngestion(FlowSpec):
    @step
    def start(self):
        self.data = fetch_data()
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Downstream flow - runs when DataIngestion completes
@trigger_on_finish(flow='DataIngestion')
class DataTransform(FlowSpec):
    @step
    def start(self):
        from metaflow import Flow
        
        # Access upstream run
        upstream_run = Flow('DataIngestion').latest_successful_run
        self.data = upstream_run['end'].task.data.data
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Complex Flow Dependencies

# Run when either FlowA or FlowB finishes
@trigger_on_finish(flows=['FlowA', 'FlowB'])
class Aggregator(FlowSpec):
    pass

# Run only when FlowA completes successfully
@trigger_on_finish(flow='FlowA', match='success')
class SuccessHandler(FlowSpec):
    pass

Examples

Data Pipeline

# Step 1: Ingest
class DataIngest(FlowSpec):
    @step
    def start(self):
        self.raw_data = ingest()
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Step 2: Process (triggered by ingest)
@trigger_on_finish(flow='DataIngest')
class DataProcess(FlowSpec):
    @step
    def start(self):
        upstream = Flow('DataIngest').latest_run
        self.processed = process(upstream.data.raw_data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Step 3: Analyze (triggered by process)
@trigger_on_finish(flow='DataProcess')
class DataAnalyze(FlowSpec):
    @step
    def start(self):
        upstream = Flow('DataProcess').latest_run
        analyze(upstream.data.processed)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Fan-Out Processing

# Main flow emits events
class MainPipeline(FlowSpec):
    @step
    def start(self):
        # Emit custom event
        emit_event('pipeline-complete', {
            'dataset': 'users',
            'records': 1000
        })
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Multiple flows react to the event
@trigger(event={
    'name': 'pipeline-complete',
    'parameters': {'dataset': 'dataset'}
})
class EmailReport(FlowSpec):
    dataset = Parameter('dataset')
    
    @step
    def start(self):
        send_email_report(self.dataset)
        self.next(self.end)
    
    @step
    def end(self):
        pass

@trigger(event={
    'name': 'pipeline-complete',
    'parameters': {'records': 'records'}
})
class UpdateMetrics(FlowSpec):
    records = Parameter('records')
    
    @step
    def start(self):
        update_dashboard(self.records)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Deployment

AWS Step Functions with EventBridge

# Deploy trigger-based flow
python flow.py step-functions create

# EventBridge rules are automatically configured

Argo Workflows with Argo Events

# Deploy trigger-based flow
python flow.py argo-workflows create

# EventSource and Sensor are created automatically

Best Practices

Always check if current.trigger exists:
@step
def start(self):
    if current.trigger:
        data = current.trigger.payload
    else:
        # Handle manual runs
        data = get_default_data()
Map event fields to parameters for type safety:
@trigger(event={
    'name': 'order-placed',
    'parameters': {
        'order_id': 'orderId',
        'amount': 'totalAmount'
    }
})
class ProcessOrder(FlowSpec):
    order_id = Parameter('order_id', type=str)
    amount = Parameter('amount', type=float)
Test flows manually without events:
# Run without trigger
python flow.py run --order_id=123 --amount=99.99
Combine with @project for organized event namespaces:
@project(name='ml-platform')
@trigger(event=namespaced_event_name('training-complete'))

Event Triggering Guide

Complete guide to event-driven workflows

@project Decorator

Project namespacing for events

AWS Step Functions

Deploy event-driven flows to AWS

Argo Workflows

Deploy event-driven flows to Argo

Build docs developers (and LLMs) love