Skip to main content
Metaflow supports event-driven workflows that automatically trigger when specific events occur, enabling reactive data pipelines and complex workflow orchestrations.

Event Decorators

Metaflow provides two decorators for event-driven workflows:
  • @trigger: Trigger flows based on custom events
  • @trigger_on_finish: Trigger flows when upstream flows complete

Triggering on Custom Events

The @trigger decorator allows flows to be triggered by custom events.

Basic Event Triggering

from metaflow import FlowSpec, step, trigger

@trigger(event='data.updated')
class DataProcessorFlow(FlowSpec):
    """Triggered when data.updated event is received"""
    
    @step
    def start(self):
        print("Processing updated data...")
        self.next(self.end)
    
    @step
    def end(self):
        print("Processing complete")

if __name__ == '__main__':
    DataProcessorFlow()
Deploy the flow:
python data_processor.py argo-workflows create
The flow will now trigger whenever a data.updated event is emitted.

Multiple Events

Trigger on any of multiple events:
from metaflow import FlowSpec, step, trigger

@trigger(events=['user.created', 'user.updated', 'user.deleted'])
class UserSyncFlow(FlowSpec):
    """Triggered by any user-related event"""
    
    @step
    def start(self):
        from metaflow import current
        
        # Access the trigger information
        trigger_info = current.trigger
        print(f"Triggered by event: {trigger_info.event}")
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Event Parameter Mapping

Map event payload fields to flow parameters:
from metaflow import FlowSpec, step, trigger, Parameter

@trigger(event={
    'name': 'order.placed',
    'parameters': {
        'order_id': 'orderId',      # Map orderId from event to order_id parameter
        'customer_id': 'customerId',
        'amount': 'amount'
    }
})
class OrderProcessorFlow(FlowSpec):
    """Process orders with data from event payload"""
    
    order_id = Parameter('order_id', default='unknown')
    customer_id = Parameter('customer_id', default='unknown')
    amount = Parameter('amount', default=0)
    
    @step
    def start(self):
        print(f"Processing order {self.order_id}")
        print(f"Customer: {self.customer_id}, Amount: {self.amount}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
When an event like this is emitted:
{
  "orderId": "ORD-12345",
  "customerId": "CUST-67890",
  "amount": 99.99
}
The flow receives the mapped parameters.

Simplified Parameter Mapping

For parameters with matching names:
@trigger(event={
    'name': 'data.ready',
    'parameters': ['dataset_name', 'timestamp', ('path', 'dataPath')]
})
class DataFlow(FlowSpec):
    # Maps dataset_name -> dataset_name
    # Maps timestamp -> timestamp
    # Maps dataPath -> path
    
    dataset_name = Parameter('dataset_name', default='')
    timestamp = Parameter('timestamp', default='')
    path = Parameter('path', default='')

Triggering on Flow Completion

The @trigger_on_finish decorator creates flow dependencies where one flow automatically triggers when another completes successfully.

Basic Flow Dependencies

from metaflow import FlowSpec, step, trigger_on_finish

# Upstream flow
class ETLFlow(FlowSpec):
    @step
    def start(self):
        # Extract and transform data
        self.data = self.process_data()
        self.next(self.end)
    
    @step
    def end(self):
        self.save_data(self.data)

# Downstream flow
@trigger_on_finish(flow='ETLFlow')
class MLTrainingFlow(FlowSpec):
    """Triggered when ETLFlow completes successfully"""
    
    @step
    def start(self):
        # Access upstream run data
        from metaflow import Flow, current
        
        trigger = current.trigger
        upstream_run = trigger.run
        
        print(f"Triggered by {upstream_run.pathspec}")
        self.data = upstream_run.data.data
        
        self.next(self.train)
    
    @step
    def train(self):
        self.model = self.train_model(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Multiple Upstream Dependencies

Trigger when any of multiple upstream flows complete:
@trigger_on_finish(flows=['DataPipelineA', 'DataPipelineB', 'DataPipelineC'])
class AggregatorFlow(FlowSpec):
    """Triggered when any data pipeline completes"""
    
    @step
    def start(self):
        from metaflow import current
        
        trigger = current.trigger
        upstream_flow = trigger.run.parent.flow_name
        
        print(f"Triggered by: {upstream_flow}")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Project-Aware Dependencies

When using @project, dependencies respect project namespaces:
from metaflow import FlowSpec, step, project, trigger_on_finish

@project(name='data_platform')
class DataIngestFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Automatically scoped to same project
@project(name='data_platform')
@trigger_on_finish(flow='DataIngestFlow')
class DataProcessorFlow(FlowSpec):
    """Triggered by DataIngestFlow in the same project/branch"""
    pass

Cross-Project Dependencies

Reference flows in different projects or branches:
# Fully qualified flow name
@trigger_on_finish(flow='data_platform.prod.DataIngestFlow')
class ConsumerFlow(FlowSpec):
    pass

# Or specify separately
@trigger_on_finish(flow={
    'name': 'DataIngestFlow',
    'project': 'data_platform',
    'project_branch': 'prod'
})
class ConsumerFlow(FlowSpec):
    pass
Project branches follow the pattern:
  • prod - production
  • user.<username> - user branch
  • test.<name> - test branch
  • prod.staging - production staging

Accessing Trigger Information

Inside triggered flows, access trigger metadata:
from metaflow import FlowSpec, step, trigger_on_finish, current

@trigger_on_finish(flow='UpstreamFlow')
class DownstreamFlow(FlowSpec):
    
    @step
    def start(self):
        # Access trigger information
        trigger = current.trigger
        
        if trigger:
            print(f"Triggered by: {trigger.run.pathspec}")
            print(f"Upstream flow: {trigger.run.parent.flow_name}")
            print(f"Upstream run ID: {trigger.run.run_id}")
            
            # Access upstream artifacts
            upstream_data = trigger.run.data
            
            # Check if specific artifact exists
            if hasattr(upstream_data, 'model'):
                self.model = upstream_data.model
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Namespaced Events

Use namespaced_event_name to automatically scope events to projects:
from metaflow import FlowSpec, step, project, trigger, namespaced_event_name

@project(name='analytics')
@trigger(event=namespaced_event_name('report.complete'))
class ReportConsumerFlow(FlowSpec):
    """Event name will be: analytics.<branch>.report.complete"""
    pass

Deploy-Time Event Configuration

Compute event names dynamically at deploy time:
from metaflow import FlowSpec, step, trigger, Parameter

def get_event_name(ctx):
    """Called at deploy time"""
    env = ctx.flow_name.lower()
    return f'{env}.data.updated'

@trigger(event=get_event_name)
class DynamicEventFlow(FlowSpec):
    pass

Complex Event Scenarios

Multi-Stage Pipeline

from metaflow import FlowSpec, step, trigger_on_finish, schedule

# Stage 1: Scheduled data ingestion
@schedule(hourly=True)
class IngestFlow(FlowSpec):
    @step
    def start(self):
        self.raw_data = self.fetch_data()
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Stage 2: Data processing (triggered by ingest)
@trigger_on_finish(flow='IngestFlow')
class ProcessFlow(FlowSpec):
    @step
    def start(self):
        from metaflow import current
        self.data = current.trigger.run.data.raw_data
        self.processed = self.clean(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Stage 3: Model training (triggered by processing)
@trigger_on_finish(flow='ProcessFlow')
class TrainFlow(FlowSpec):
    @step
    def start(self):
        from metaflow import current
        self.data = current.trigger.run.data.processed
        self.model = self.train(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Stage 4: Model deployment (triggered by training)
@trigger_on_finish(flow='TrainFlow')
class DeployFlow(FlowSpec):
    @step
    def start(self):
        from metaflow import current
        self.model = current.trigger.run.data.model
        self.deploy_model(self.model)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Fan-Out/Fan-In Pattern

from metaflow import FlowSpec, step, trigger_on_finish

# Single upstream flow
class DataPrepFlow(FlowSpec):
    @step
    def start(self):
        self.dataset = self.prepare_data()
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Multiple parallel consumers
@trigger_on_finish(flow='DataPrepFlow')
class ModelA_Flow(FlowSpec):
    @step
    def start(self):
        self.model_a = self.train_model_a()
        self.next(self.end)
    
    @step
    def end(self):
        pass

@trigger_on_finish(flow='DataPrepFlow')
class ModelB_Flow(FlowSpec):
    @step
    def start(self):
        self.model_b = self.train_model_b()
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Aggregator waits for both models
@trigger_on_finish(flows=['ModelA_Flow', 'ModelB_Flow'])
class EnsembleFlow(FlowSpec):
    @step
    def start(self):
        from metaflow import current, Flow
        
        # Get both models
        model_a = Flow('ModelA_Flow').latest_successful_run.data.model_a
        model_b = Flow('ModelB_Flow').latest_successful_run.data.model_b
        
        self.ensemble = self.create_ensemble(model_a, model_b)
        self.next(self.end)
    
    @step
    def end(self):
        pass

Testing Event-Driven Flows

Testing with —trigger Option

Test @trigger_on_finish locally:
# Run the upstream flow first
python upstream_flow.py run

# Trigger downstream with specific run
python downstream_flow.py run --trigger UpstreamFlow/12345

Manual Event Testing

For custom events, you’ll need to emit events through your orchestrator’s event system (e.g., Argo Events, AWS EventBridge).

Platform-Specific Features

Argo Workflows + Argo Events

Argo Workflows uses Argo Events for event handling:
# Deploy flow with event trigger
python myflow.py argo-workflows create --auto-emit-argo-events
Argo Events sensors are automatically created to listen for trigger events.

AWS Step Functions + EventBridge

Step Functions uses Amazon EventBridge:
# Deploy flow with event trigger
python myflow.py step-functions create
EventBridge rules are automatically configured for @trigger_on_finish decorators.

Best Practices

Event-driven flows may be triggered multiple times. Ensure your flows can handle duplicate triggers safely.
Always check if upstream artifacts exist before accessing them:
trigger = current.trigger
if trigger and hasattr(trigger.run.data, 'dataset'):
    self.data = trigger.run.data.dataset
else:
    # Handle missing data
    pass
Use @project and namespaced_event_name to avoid event name collisions across teams.
Set up monitoring to track event delivery and flow triggering success rates.
Maintain documentation of flow dependencies to understand the execution graph.

Troubleshooting

Flow Not Triggering

  1. Check event names: Ensure event names match exactly
  2. Verify deployment: Flow must be deployed, not just run locally
  3. Check permissions: Ensure orchestrator has permissions to trigger flows
  4. Review logs: Check orchestrator logs for event delivery issues

Parameter Mapping Issues

  1. Verify parameter defaults: All parameters need defaults for event flows
  2. Check field names: Event payload fields must match parameter mapping
  3. Review event payload: Log actual event payloads to verify structure

Next Steps

Scheduling

Combine events with scheduled flows

Monitoring

Monitor event-driven pipelines

Orchestrators

Platform-specific event features

Build docs developers (and LLMs) love