Skip to main content

Airflow Integration

The dagster-airflow library provides tools for:
  • Migrating existing Airflow DAGs to Dagster
  • Running Airflow tasks from within Dagster pipelines
  • Triggering Dagster runs from Airflow
  • Gradual migration from Airflow to Dagster
For comprehensive Airflow migration, consider using dagster-airlift which provides an incremental migration path with minimal changes to existing Airflow code.

Installation

pip install dagster-airflow

# Install with Airflow compatibility
pip install dagster-airflow apache-airflow

Migration Approaches

1. Convert DAGs to Dagster Assets

Load Airflow DAGs as Dagster assets:
from dagster import Definitions
from dagster_airflow import load_assets_from_airflow_dag
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define your Airflow DAG
with DAG(
    dag_id="example_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
) as dag:
    task_1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )
    task_2 = BashOperator(
        task_id="process_data",
        bash_command="echo 'Processing data'",
    )
    task_1 >> task_2

# Convert to Dagster assets
airflow_assets = load_assets_from_airflow_dag(
    dag=dag,
    task_ids_by_asset_key={
        "date_output": {"print_date"},
        "processed_data": {"process_data"},
    },
)

defs = Definitions(assets=[airflow_assets])

2. Load Multiple DAGs

Convert an entire Airflow DAGs folder:
from dagster_airflow import (
    make_dagster_definitions_from_airflow_dags_path,
)

# Load all DAGs from a directory
defs = make_dagster_definitions_from_airflow_dags_path(
    "/path/to/airflow/dags",
    connections={
        "my_connection": {"conn_type": "postgres", "host": "localhost"},
    },
)

3. Load from DagBag

Convert DAGs from an Airflow DagBag:
from dagster_airflow import make_dagster_definitions_from_airflow_dag_bag
from airflow.models import DagBag

# Load from existing DagBag
dag_bag = DagBag(dag_folder="/path/to/dags")

defs = make_dagster_definitions_from_airflow_dag_bag(
    dag_bag=dag_bag,
)

Airflow Operators in Dagster

DagsterOperator

Trigger Dagster jobs from Airflow:
from airflow import DAG
from dagster_airflow import DagsterOperator
from datetime import datetime

with DAG(
    dag_id="dagster_trigger_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
) as dag:
    trigger_dagster = DagsterOperator(
        task_id="trigger_dagster_job",
        job_name="my_dagster_job",
        dagster_url="http://localhost:3000",
        repository_name="my_repository",
        repository_location_name="my_location",
    )

DagsterCloudOperator

Trigger Dagster Cloud jobs from Airflow:
from dagster_airflow import DagsterCloudOperator

trigger_cloud_job = DagsterCloudOperator(
    task_id="trigger_cloud_job",
    job_name="my_cloud_job",
    dagster_cloud_url="https://myorg.dagster.cloud/prod",
    user_token="your_user_token",
    repository_name="my_repository",
    repository_location_name="my_location",
)

Airflow Resources

Use Airflow connections and resources within Dagster:
from dagster import Definitions, asset
from dagster_airflow import (
    make_ephemeral_airflow_db_resource,
    make_persistent_airflow_db_resource,
)

# Ephemeral database for testing
ephemeral_airflow_db = make_ephemeral_airflow_db_resource(
    dagster_airflow_dbpath="/tmp/airflow.db",
)

# Persistent database
persistent_airflow_db = make_persistent_airflow_db_resource(
    dagster_airflow_dbpath="/path/to/airflow.db",
)

@asset
def my_asset(airflow_db):
    # Access Airflow connections
    conn = airflow_db.get_connection("my_connection")
    # Use connection...
    return {"status": "success"}

defs = Definitions(
    assets=[my_asset],
    resources={"airflow_db": persistent_airflow_db},
)

Migration Patterns

Task Mapping

Map Airflow task dependencies to Dagster asset dependencies:
from dagster import asset, AssetIn
from airflow import DAG
from airflow.operators.python import PythonOperator

def extract():
    return {"data": [1, 2, 3]}

def transform():
    # Process data
    pass

with DAG("etl_dag", start_date=datetime(2024, 1, 1)) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    extract_task >> transform_task

# Dagster equivalent with explicit dependencies
@asset
def extract_data():
    return {"data": [1, 2, 3]}

@asset(ins={"extract_data": AssetIn()})
def transform_data(extract_data):
    # Process data from extract_data
    return {"processed": True}

XCom to Asset Outputs

Convert Airflow XCom passing to Dagster asset dependencies:
# Airflow with XCom
def push_to_xcom(**context):
    context['ti'].xcom_push(key='result', value={'count': 100})

def pull_from_xcom(**context):
    result = context['ti'].xcom_pull(key='result')
    print(f"Count: {result['count']}")

# Dagster equivalent - natural data flow
@asset
def compute_count():
    return {"count": 100}

@asset
def process_count(compute_count):
    print(f"Count: {compute_count['count']}")
    return {"processed": True}

Scheduling

Convert Airflow schedules to Dagster schedules:
from dagster import ScheduleDefinition, Definitions

# Airflow: schedule_interval="@daily"
# Dagster equivalent:
daily_schedule = ScheduleDefinition(
    name="daily_etl",
    cron_schedule="0 0 * * *",
    target="*",
)

# Airflow: schedule_interval="0 */4 * * *"
every_four_hours = ScheduleDefinition(
    name="every_four_hours",
    cron_schedule="0 */4 * * *",
    target="*",
)

defs = Definitions(
    assets=[extract_data, transform_data],
    schedules=[daily_schedule, every_four_hours],
)

Airflow Hooks Integration

Use DagsterHook to send data to Dagster from Airflow:
from dagster_airflow import DagsterHook
from airflow import DAG
from airflow.operators.python import PythonOperator

def send_to_dagster(**context):
    hook = DagsterHook(
        dagster_url="http://localhost:3000",
        repository_name="my_repo",
    )
    
    # Trigger a Dagster run
    run_id = hook.launch_run(
        job_name="my_job",
        run_config={"ops": {"my_op": {"config": {"param": "value"}}}},
    )
    
    # Wait for completion
    hook.wait_for_run(run_id)

with DAG("airflow_to_dagster", start_date=datetime(2024, 1, 1)) as dag:
    send_task = PythonOperator(
        task_id="send_to_dagster",
        python_callable=send_to_dagster,
    )
Add Dagster links to Airflow UI:
from dagster_airflow import DagsterLink
from airflow.plugins_manager import AirflowPlugin

class DagsterAirflowPlugin(AirflowPlugin):
    name = "dagster_plugin"
    operator_extra_links = [DagsterLink()]

Incremental Migration with Airlift

For large Airflow deployments, use dagster-airlift for gradual migration:
pip install dagster-airlift
from dagster_airlift import (
    AirflowInstance,
    build_airlift_metadata_mapping_info,
    load_airflow_dag_asset_specs,
)

# Point to Airflow instance
airflow_instance = AirflowInstance(
    webserver_url="http://localhost:8080",
    auth_backend="airflow.api.auth.backend.basic_auth",
    username="admin",
    password="admin",
)

# Load DAG specs without executing
dag_specs = load_airflow_dag_asset_specs(
    airflow_instance=airflow_instance,
    dag_selector_fn=lambda dag_id: dag_id.startswith("etl_"),
)

defs = Definitions(assets=dag_specs)
With Airlift, you can:
  • Observe Airflow DAGs in Dagster without code changes
  • Incrementally migrate tasks one at a time
  • Maintain Airflow execution while building Dagster observability
  • Gradually transfer orchestration to Dagster

Best Practices

  1. Start with observation: Use airlift to observe Airflow DAGs before migrating
  2. Migrate incrementally: Convert one DAG or task at a time
  3. Test thoroughly: Run migrated code in parallel with Airflow initially
  4. Use assets over ops: Prefer Dagster assets for better dependency management
  5. Leverage metadata: Convert Airflow variables and connections to Dagster resources

Common Migration Challenges

Dynamic Task Generation

Airflow dynamic tasks need different approaches in Dagster:
# Airflow dynamic tasks
from airflow.decorators import task

@task
def dynamic_task_generator():
    return [1, 2, 3]

@task
def process_item(item):
    return item * 2

results = process_item.expand(item=dynamic_task_generator())

# Dagster equivalent - use multi-assets or graph composition
from dagster import multi_asset, AssetOut

@multi_asset(
    outs={
        "item_1": AssetOut(),
        "item_2": AssetOut(),
        "item_3": AssetOut(),
    }
)
def process_items():
    items = [1, 2, 3]
    return tuple(item * 2 for item in items)

Sensors vs Sensors

Convert Airflow sensors to Dagster sensors:
# Airflow file sensor
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id="wait_for_file",
    filepath="/path/to/file.csv",
)

# Dagster equivalent
from dagster import sensor, RunRequest, SensorEvaluationContext
import os

@sensor(job_name="process_file_job")
def file_sensor(context: SensorEvaluationContext):
    filepath = "/path/to/file.csv"
    if os.path.exists(filepath):
        return RunRequest(run_key=filepath)

API Reference

Key functions:
  • load_assets_from_airflow_dag: Convert a single DAG to assets
  • make_dagster_definitions_from_airflow_dags_path: Load all DAGs from directory
  • make_dagster_definitions_from_airflow_dag_bag: Convert DagBag to Definitions
  • DagsterOperator: Airflow operator to trigger Dagster runs
  • DagsterCloudOperator: Trigger Dagster Cloud runs from Airflow
  • DagsterHook: Airflow hook for Dagster API interaction
For complete documentation, see dagster-airflow API reference.

Next Steps

Build docs developers (and LLMs) love