Skip to main content
The @sensor decorator creates a sensor that can trigger runs based on custom logic. Sensors are continuously evaluated and can create run requests based on external state.

Signature

@sensor(
    job_name: Optional[str] = None,
    name: Optional[str] = None,
    minimum_interval_seconds: Optional[int] = None,
    description: Optional[str] = None,
    job: Optional[ExecutableDefinition] = None,
    jobs: Optional[Sequence[ExecutableDefinition]] = None,
    default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
    asset_selection: Optional[CoercibleToAssetSelection] = None,
    required_resource_keys: Optional[Set[str]] = None,
    tags: Optional[Mapping[str, str]] = None,
    metadata: Optional[RawMetadataMapping] = None,
    target: Optional[Union[
        CoercibleToAssetSelection,
        AssetsDefinition,
        JobDefinition,
        UnresolvedAssetJobDefinition,
    ]] = None,
    owners: Optional[Sequence[str]] = None,
) -> SensorDefinition

Parameters

name
Optional[str]
The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds
Optional[int]
The minimum number of seconds that will elapse between sensor evaluations.
description
Optional[str]
A human-readable description of the sensor.
job
Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]
The job to be executed when the sensor fires.
jobs
Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]
A list of jobs to be executed when the sensor fires.
default_status
DefaultSensorStatus
default:"DefaultSensorStatus.STOPPED"
Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API.
asset_selection
Optional[Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]]
An asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
required_resource_keys
Optional[Set[str]]
A set of resource keys that must be available on the context when the sensor evaluation function runs. Use this to specify resources your sensor function depends on.
tags
Optional[Mapping[str, str]]
A set of key-value tags that annotate the sensor and can be used for searching and filtering in the UI.
metadata
Optional[Mapping[str, object]]
A set of metadata entries that annotate the sensor. Values will be normalized to typed MetadataValue objects.
target
Optional[Union[CoercibleToAssetSelection, AssetsDefinition, JobDefinition, UnresolvedAssetJobDefinition]]
The target that the sensor will execute. It can take AssetSelection objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition). It can also accept JobDefinition and UnresolvedAssetJobDefinition objects. This parameter will replace job, jobs, and asset_selection.
owners
Optional[Sequence[str]]
A sequence of strings identifying the owners of the sensor.

Returns

Type: SensorDefinition A sensor definition object.

Evaluation Function Return Values

The decorated function may return or yield:
  1. A RunRequest object
  2. A list of RunRequest objects
  3. A SkipReason object, providing a descriptive message of why no runs were requested
  4. Nothing (skipping without providing a reason)
  5. Yield a SkipReason or yield one or more RunRequest objects

Examples

Basic Sensor

from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import os

@sensor(job=my_job)
def my_sensor(context: SensorEvaluationContext):
    if os.path.exists("/tmp/trigger_file"):
        return RunRequest(run_key="trigger_run")
    else:
        return SkipReason("Trigger file not found")

Sensor with Run Configuration

from dagster import sensor, RunRequest, SensorEvaluationContext
import json

@sensor(job=my_job, minimum_interval_seconds=30)
def config_sensor(context: SensorEvaluationContext):
    with open("/tmp/config.json") as f:
        config_data = json.load(f)

    return RunRequest(
        run_key=f"run_{config_data['id']}",
        run_config={
            "ops": {
                "my_op": {
                    "config": {"value": config_data["value"]}
                }
            }
        },
    )

Sensor with Cursor (Tracking State)

from dagster import sensor, RunRequest, SensorEvaluationContext
import requests

@sensor(job=my_job)
def api_sensor(context: SensorEvaluationContext):
    # Get the last processed ID from the cursor
    last_id = int(context.cursor) if context.cursor else 0

    # Fetch new records from API
    response = requests.get(f"https://api.example.com/records?since={last_id}")
    records = response.json()

    if not records:
        return SkipReason("No new records")

    # Create a run request for each new record
    for record in records:
        yield RunRequest(
            run_key=str(record["id"]),
            run_config={"ops": {"process_record": {"config": record}}},
        )

    # Update cursor to the latest record ID
    context.update_cursor(str(records[-1]["id"]))

Sensor Targeting Assets

from dagster import sensor, RunRequest, AssetSelection, SensorEvaluationContext

@sensor(
    asset_selection=AssetSelection.groups("finance"),
    minimum_interval_seconds=60,
)
def finance_sensor(context: SensorEvaluationContext):
    # Check if finance data is ready
    if finance_data_is_ready():
        return RunRequest(
            run_key="finance_run",
            tags={"source": "finance_sensor"},
        )
    return SkipReason("Finance data not ready")

Sensor with Multiple Jobs

from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(jobs=[job_a, job_b])
def multi_job_sensor(context: SensorEvaluationContext):
    # Trigger different jobs based on conditions
    if should_run_job_a():
        yield RunRequest(
            run_key="job_a_run",
            job_name="job_a",
        )

    if should_run_job_b():
        yield RunRequest(
            run_key="job_b_run",
            job_name="job_b",
        )

Sensor with Resources

from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(
    job=my_job,
    required_resource_keys={"s3_client"},
)
def s3_sensor(context: SensorEvaluationContext):
    s3 = context.resources.s3_client
    new_files = s3.list_new_files("my-bucket")

    for file in new_files:
        yield RunRequest(
            run_key=file.name,
            run_config={
                "ops": {
                    "process_file": {"config": {"file_path": file.path}}
                }
            },
        )

Sensor with Tags and Metadata

from dagster import sensor, RunRequest, SensorEvaluationContext, MetadataValue

@sensor(
    job=my_job,
    tags={"team": "data-platform", "priority": "high"},
    metadata={
        "owner": "[email protected]",
        "documentation": MetadataValue.url("https://docs.example.com/sensors"),
    },
    owners=["team:data-platform"],
)
def documented_sensor(context: SensorEvaluationContext):
    return RunRequest(
        run_key="documented_run",
        tags={"triggered_by": "documented_sensor"},
    )

Sensor with Default Running Status

from dagster import sensor, RunRequest, DefaultSensorStatus

@sensor(
    job=my_job,
    default_status=DefaultSensorStatus.RUNNING,
    minimum_interval_seconds=300,
)
def always_on_sensor():
    # This sensor will be running by default
    if should_trigger():
        return RunRequest(run_key="auto_run")
  • @schedule - Time-based scheduling
  • @asset - Define software-defined assets
  • Jobs - Create executable jobs

Build docs developers (and LLMs) love