The @sensor decorator creates a sensor that can trigger runs based on custom logic. Sensors are continuously evaluated and can create run requests based on external state.
Signature
@sensor(
job_name: Optional[str] = None,
name: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
asset_selection: Optional[CoercibleToAssetSelection] = None,
required_resource_keys: Optional[Set[str]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[RawMetadataMapping] = None,
target: Optional[Union[
CoercibleToAssetSelection,
AssetsDefinition,
JobDefinition,
UnresolvedAssetJobDefinition,
]] = None,
owners: Optional[Sequence[str]] = None,
) -> SensorDefinition
Parameters
The name of the sensor. Defaults to the name of the decorated function.
The minimum number of seconds that will elapse between sensor evaluations.
A human-readable description of the sensor.
job
Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]
The job to be executed when the sensor fires.
jobs
Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]
A list of jobs to be executed when the sensor fires.
default_status
DefaultSensorStatus
default:"DefaultSensorStatus.STOPPED"
Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API.
asset_selection
Optional[Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]]
An asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
A set of resource keys that must be available on the context when the sensor evaluation function runs. Use this to specify resources your sensor function depends on.
tags
Optional[Mapping[str, str]]
A set of key-value tags that annotate the sensor and can be used for searching and filtering in the UI.
metadata
Optional[Mapping[str, object]]
A set of metadata entries that annotate the sensor. Values will be normalized to typed MetadataValue objects.
target
Optional[Union[CoercibleToAssetSelection, AssetsDefinition, JobDefinition, UnresolvedAssetJobDefinition]]
The target that the sensor will execute. It can take AssetSelection objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition). It can also accept JobDefinition and UnresolvedAssetJobDefinition objects. This parameter will replace job, jobs, and asset_selection.
A sequence of strings identifying the owners of the sensor.
Returns
Type: SensorDefinition
A sensor definition object.
Evaluation Function Return Values
The decorated function may return or yield:
- A
RunRequest object
- A list of
RunRequest objects
- A
SkipReason object, providing a descriptive message of why no runs were requested
- Nothing (skipping without providing a reason)
- Yield a
SkipReason or yield one or more RunRequest objects
Examples
Basic Sensor
from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import os
@sensor(job=my_job)
def my_sensor(context: SensorEvaluationContext):
if os.path.exists("/tmp/trigger_file"):
return RunRequest(run_key="trigger_run")
else:
return SkipReason("Trigger file not found")
Sensor with Run Configuration
from dagster import sensor, RunRequest, SensorEvaluationContext
import json
@sensor(job=my_job, minimum_interval_seconds=30)
def config_sensor(context: SensorEvaluationContext):
with open("/tmp/config.json") as f:
config_data = json.load(f)
return RunRequest(
run_key=f"run_{config_data['id']}",
run_config={
"ops": {
"my_op": {
"config": {"value": config_data["value"]}
}
}
},
)
Sensor with Cursor (Tracking State)
from dagster import sensor, RunRequest, SensorEvaluationContext
import requests
@sensor(job=my_job)
def api_sensor(context: SensorEvaluationContext):
# Get the last processed ID from the cursor
last_id = int(context.cursor) if context.cursor else 0
# Fetch new records from API
response = requests.get(f"https://api.example.com/records?since={last_id}")
records = response.json()
if not records:
return SkipReason("No new records")
# Create a run request for each new record
for record in records:
yield RunRequest(
run_key=str(record["id"]),
run_config={"ops": {"process_record": {"config": record}}},
)
# Update cursor to the latest record ID
context.update_cursor(str(records[-1]["id"]))
Sensor Targeting Assets
from dagster import sensor, RunRequest, AssetSelection, SensorEvaluationContext
@sensor(
asset_selection=AssetSelection.groups("finance"),
minimum_interval_seconds=60,
)
def finance_sensor(context: SensorEvaluationContext):
# Check if finance data is ready
if finance_data_is_ready():
return RunRequest(
run_key="finance_run",
tags={"source": "finance_sensor"},
)
return SkipReason("Finance data not ready")
Sensor with Multiple Jobs
from dagster import sensor, RunRequest, SensorEvaluationContext
@sensor(jobs=[job_a, job_b])
def multi_job_sensor(context: SensorEvaluationContext):
# Trigger different jobs based on conditions
if should_run_job_a():
yield RunRequest(
run_key="job_a_run",
job_name="job_a",
)
if should_run_job_b():
yield RunRequest(
run_key="job_b_run",
job_name="job_b",
)
Sensor with Resources
from dagster import sensor, RunRequest, SensorEvaluationContext
@sensor(
job=my_job,
required_resource_keys={"s3_client"},
)
def s3_sensor(context: SensorEvaluationContext):
s3 = context.resources.s3_client
new_files = s3.list_new_files("my-bucket")
for file in new_files:
yield RunRequest(
run_key=file.name,
run_config={
"ops": {
"process_file": {"config": {"file_path": file.path}}
}
},
)
from dagster import sensor, RunRequest, SensorEvaluationContext, MetadataValue
@sensor(
job=my_job,
tags={"team": "data-platform", "priority": "high"},
metadata={
"owner": "[email protected]",
"documentation": MetadataValue.url("https://docs.example.com/sensors"),
},
owners=["team:data-platform"],
)
def documented_sensor(context: SensorEvaluationContext):
return RunRequest(
run_key="documented_run",
tags={"triggered_by": "documented_sensor"},
)
Sensor with Default Running Status
from dagster import sensor, RunRequest, DefaultSensorStatus
@sensor(
job=my_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=300,
)
def always_on_sensor():
# This sensor will be running by default
if should_trigger():
return RunRequest(run_key="auto_run")
- @schedule - Time-based scheduling
- @asset - Define software-defined assets
- Jobs - Create executable jobs