Execute flows automatically based on events and upstream flow completion
metaflow-dagster supports event-driven execution through two decorators that compile to Dagster SensorDefinition objects: @trigger for custom events and @trigger_on_finish for flow dependencies.
The compiler generates a sensor with a stub implementation:
Copy
Ask AI
@sensor(job=ProcessDataFlow, name="ProcessDataFlow_on_event_0", default_status=DefaultSensorStatus.RUNNING)def ProcessDataFlow_on_event_0(context: SensorEvaluationContext): """Fire ProcessDataFlow when the 'data.ready' event is detected. Dagster does not have a native event-bus — this sensor polls for the event by checking a cursor value. Emit a RunRequest whenever you detect the event (e.g., by reading a queue, a file marker, or an external API). """ # TODO: replace this stub with real event-detection logic. # Example: read from a queue, a DB table, or an HTTP endpoint. event_detected = False # set to True when the event arrives if event_detected: run_config = {} yield RunRequest(run_key=context.cursor, run_config=run_config)
The generated @trigger sensor is a stub. You must implement the event-detection logic before the sensor will fire runs.
The compiler generates a fully functional sensor that polls Dagster’s run history:
Copy
Ask AI
@sensor(job=DownstreamFlow, name="DownstreamFlow_on_finish_0", default_status=DefaultSensorStatus.RUNNING)def DownstreamFlow_on_finish_0(context: SensorEvaluationContext): """Fire DownstreamFlow when Dagster job 'UpstreamFlow' completes successfully. This sensor watches the Dagster run history for successful runs of the upstream job (compiled from the 'UpstreamFlow' Metaflow flow) and triggers a new run of DownstreamFlow for each new completion. """ from dagster import DagsterInstance, RunsFilter, DagsterRunStatus instance = context.instance last_cursor = context.cursor or "0" runs = instance.get_runs( filters=RunsFilter( job_name='UpstreamFlow', statuses=[DagsterRunStatus.SUCCESS], ), limit=50, ) new_cursor = last_cursor for run in runs: run_key = run.run_id if run_key > last_cursor: yield RunRequest(run_key=run_key) if run_key > new_cursor: new_cursor = run_key context.update_cursor(new_cursor)
The @trigger_on_finish sensor is fully implemented and requires no additional code. It works out of the box.
Find your sensor (e.g., DownstreamFlow_on_finish_0)
Toggle to Running to activate
View evaluation history and triggered runs
Sensors require the Dagster daemon to be running. When using dagster dev, the daemon starts automatically. For production deployments, ensure the daemon is configured and running separately.