Skip to main content
metaflow-dagster supports event-driven execution through two decorators that compile to Dagster SensorDefinition objects: @trigger for custom events and @trigger_on_finish for flow dependencies.

Overview

Both decorators automatically generate sensors in the compiled Dagster file:
  • @trigger: Watches for custom events (requires implementation)
  • @trigger_on_finish: Watches for upstream Dagster job completion
Sensors are included in the Definitions object and visible in the Dagster UI alongside schedules and jobs.

@trigger: Event-driven execution

The @trigger decorator creates a sensor stub that fires your flow when a named event occurs.

Basic usage

from metaflow import FlowSpec, step, trigger

@trigger(event="data.ready")
class ProcessDataFlow(FlowSpec):
    @step
    def start(self):
        print("Processing data...")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Generated sensor

The compiler generates a sensor with a stub implementation:
@sensor(job=ProcessDataFlow, name="ProcessDataFlow_on_event_0", 
        default_status=DefaultSensorStatus.RUNNING)
def ProcessDataFlow_on_event_0(context: SensorEvaluationContext):
    """Fire ProcessDataFlow when the 'data.ready' event is detected.
    
    Dagster does not have a native event-bus — this sensor polls for the event
    by checking a cursor value. Emit a RunRequest whenever you detect the event
    (e.g., by reading a queue, a file marker, or an external API).
    """
    # TODO: replace this stub with real event-detection logic.
    # Example: read from a queue, a DB table, or an HTTP endpoint.
    event_detected = False  # set to True when the event arrives
    if event_detected:
        run_config = {}
        yield RunRequest(run_key=context.cursor, run_config=run_config)
The generated @trigger sensor is a stub. You must implement the event-detection logic before the sensor will fire runs.

Implementing event detection

1

Generate the definitions file

python my_flow.py dagster create dagster_defs.py
2

Edit the sensor function

Open dagster_defs.py and find the sensor function (e.g., ProcessDataFlow_on_event_0).
3

Add event-detection logic

Replace the stub with real event polling:
@sensor(job=ProcessDataFlow, name="ProcessDataFlow_on_event_0",
        default_status=DefaultSensorStatus.RUNNING)
def ProcessDataFlow_on_event_0(context: SensorEvaluationContext):
    # Example: Check S3 for new files
    import boto3
    s3 = boto3.client('s3')
    
    last_key = context.cursor or ""
    response = s3.list_objects_v2(Bucket='my-bucket', Prefix='data/')
    
    for obj in response.get('Contents', []):
        key = obj['Key']
        if key > last_key:
            yield RunRequest(run_key=key)
            context.update_cursor(key)
4

Launch Dagster

dagster dev -f dagster_defs.py
The sensor evaluates on a regular interval (default: 30 seconds).

Mapping event data to parameters

Pass event fields to flow parameters using the parameters mapping:
from metaflow import FlowSpec, step, trigger, Parameter

@trigger(event="order.created", parameters={"order_id": "id"})
class ProcessOrderFlow(FlowSpec):
    order_id = Parameter("order_id", default="")
    
    @step
    def start(self):
        print(f"Processing order {self.order_id}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
The generated sensor includes a run_config snippet that maps event fields to flow parameters.

@trigger_on_finish: Flow dependencies

The @trigger_on_finish decorator creates a sensor that automatically fires when an upstream Dagster job completes successfully.

Basic usage

from metaflow import FlowSpec, step, trigger_on_finish

@trigger_on_finish(flow="UpstreamFlow")
class DownstreamFlow(FlowSpec):
    @step
    def start(self):
        print("Upstream completed, running downstream...")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Generated sensor

The compiler generates a fully functional sensor that polls Dagster’s run history:
@sensor(job=DownstreamFlow, name="DownstreamFlow_on_finish_0",
        default_status=DefaultSensorStatus.RUNNING)
def DownstreamFlow_on_finish_0(context: SensorEvaluationContext):
    """Fire DownstreamFlow when Dagster job 'UpstreamFlow' completes successfully.
    
    This sensor watches the Dagster run history for successful runs of the
    upstream job (compiled from the 'UpstreamFlow' Metaflow flow) and
    triggers a new run of DownstreamFlow for each new completion.
    """
    from dagster import DagsterInstance, RunsFilter, DagsterRunStatus
    instance = context.instance
    last_cursor = context.cursor or "0"
    runs = instance.get_runs(
        filters=RunsFilter(
            job_name='UpstreamFlow',
            statuses=[DagsterRunStatus.SUCCESS],
        ),
        limit=50,
    )
    new_cursor = last_cursor
    for run in runs:
        run_key = run.run_id
        if run_key > last_cursor:
            yield RunRequest(run_key=run_key)
            if run_key > new_cursor:
                new_cursor = run_key
    context.update_cursor(new_cursor)
The @trigger_on_finish sensor is fully implemented and requires no additional code. It works out of the box.

How it works

1

Sensor polls run history

Every evaluation interval (default: 30 seconds), the sensor queries the Dagster instance for new successful runs of the upstream job.
2

Cursor tracks progress

The sensor uses context.cursor to remember the last processed run ID, ensuring each upstream completion triggers exactly one downstream run.
3

RunRequest fires the job

For each new successful run, the sensor yields a RunRequest with a unique run_key.
4

Downstream flow executes

Dagster launches the downstream job automatically.

Building pipelines with trigger_on_finish

Chain multiple flows to create a dependency graph:
# Flow 1: Extract data
class ExtractFlow(FlowSpec):
    @step
    def start(self):
        # Extract from source
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Flow 2: Transform data (runs after ExtractFlow)
@trigger_on_finish(flow="ExtractFlow")
class TransformFlow(FlowSpec):
    @step
    def start(self):
        # Transform extracted data
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Flow 3: Load data (runs after TransformFlow)
@trigger_on_finish(flow="TransformFlow")
class LoadFlow(FlowSpec):
    @step
    def start(self):
        # Load transformed data
        self.next(self.end)
    
    @step
    def end(self):
        pass
Compile all three flows to the same definitions file or deploy them separately.

Multiple triggers

A flow can have multiple @trigger or @trigger_on_finish decorators:
@trigger_on_finish(flow="FlowA")
@trigger_on_finish(flow="FlowB")
class AggregatorFlow(FlowSpec):
    @step
    def start(self):
        # Runs when either FlowA or FlowB completes
        self.next(self.end)
    
    @step
    def end(self):
        pass
Each decorator generates a separate sensor. The generated file includes all sensors in the Definitions object:
defs = Definitions(
    jobs=[AggregatorFlow],
    schedules=[],
    sensors=[AggregatorFlow_on_finish_0, AggregatorFlow_on_finish_1],
)

Managing sensors in Dagster

Sensors are visible in the Dagster UI:
  1. Navigate to Overview → Sensors
  2. Find your sensor (e.g., DownstreamFlow_on_finish_0)
  3. Toggle to Running to activate
  4. View evaluation history and triggered runs
Sensors require the Dagster daemon to be running. When using dagster dev, the daemon starts automatically. For production deployments, ensure the daemon is configured and running separately.

Combining triggers with schedules

Flows can have both schedules and triggers:
@schedule(daily=True)
@trigger_on_finish(flow="UpstreamFlow")
class HybridFlow(FlowSpec):
    @step
    def start(self):
        # Runs daily AND when UpstreamFlow completes
        self.next(self.end)
    
    @step
    def end(self):
        pass
The generated file includes both a ScheduleDefinition and a SensorDefinition.

Next steps

Scheduling

Learn about time-based execution with @schedule

Resume Failed Runs

Recover from failures by reusing completed steps