Airflow Integration
The dagster-airflow library provides tools for:
- Migrating existing Airflow DAGs to Dagster
- Running Airflow tasks from within Dagster pipelines
- Triggering Dagster runs from Airflow
- Gradual migration from Airflow to Dagster
For comprehensive Airflow migration, consider using dagster-airlift which provides an incremental migration path with minimal changes to existing Airflow code.
Installation
pip install dagster-airflow
# Install with Airflow compatibility
pip install dagster-airflow apache-airflow
Migration Approaches
1. Convert DAGs to Dagster Assets
Load Airflow DAGs as Dagster assets:
from dagster import Definitions
from dagster_airflow import load_assets_from_airflow_dag
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# Define your Airflow DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
) as dag:
task_1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task_2 = BashOperator(
task_id="process_data",
bash_command="echo 'Processing data'",
)
task_1 >> task_2
# Convert to Dagster assets
airflow_assets = load_assets_from_airflow_dag(
dag=dag,
task_ids_by_asset_key={
"date_output": {"print_date"},
"processed_data": {"process_data"},
},
)
defs = Definitions(assets=[airflow_assets])
2. Load Multiple DAGs
Convert an entire Airflow DAGs folder:
from dagster_airflow import (
make_dagster_definitions_from_airflow_dags_path,
)
# Load all DAGs from a directory
defs = make_dagster_definitions_from_airflow_dags_path(
"/path/to/airflow/dags",
connections={
"my_connection": {"conn_type": "postgres", "host": "localhost"},
},
)
3. Load from DagBag
Convert DAGs from an Airflow DagBag:
from dagster_airflow import make_dagster_definitions_from_airflow_dag_bag
from airflow.models import DagBag
# Load from existing DagBag
dag_bag = DagBag(dag_folder="/path/to/dags")
defs = make_dagster_definitions_from_airflow_dag_bag(
dag_bag=dag_bag,
)
Airflow Operators in Dagster
DagsterOperator
Trigger Dagster jobs from Airflow:
from airflow import DAG
from dagster_airflow import DagsterOperator
from datetime import datetime
with DAG(
dag_id="dagster_trigger_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
) as dag:
trigger_dagster = DagsterOperator(
task_id="trigger_dagster_job",
job_name="my_dagster_job",
dagster_url="http://localhost:3000",
repository_name="my_repository",
repository_location_name="my_location",
)
DagsterCloudOperator
Trigger Dagster Cloud jobs from Airflow:
from dagster_airflow import DagsterCloudOperator
trigger_cloud_job = DagsterCloudOperator(
task_id="trigger_cloud_job",
job_name="my_cloud_job",
dagster_cloud_url="https://myorg.dagster.cloud/prod",
user_token="your_user_token",
repository_name="my_repository",
repository_location_name="my_location",
)
Airflow Resources
Use Airflow connections and resources within Dagster:
from dagster import Definitions, asset
from dagster_airflow import (
make_ephemeral_airflow_db_resource,
make_persistent_airflow_db_resource,
)
# Ephemeral database for testing
ephemeral_airflow_db = make_ephemeral_airflow_db_resource(
dagster_airflow_dbpath="/tmp/airflow.db",
)
# Persistent database
persistent_airflow_db = make_persistent_airflow_db_resource(
dagster_airflow_dbpath="/path/to/airflow.db",
)
@asset
def my_asset(airflow_db):
# Access Airflow connections
conn = airflow_db.get_connection("my_connection")
# Use connection...
return {"status": "success"}
defs = Definitions(
assets=[my_asset],
resources={"airflow_db": persistent_airflow_db},
)
Migration Patterns
Task Mapping
Map Airflow task dependencies to Dagster asset dependencies:
from dagster import asset, AssetIn
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract():
return {"data": [1, 2, 3]}
def transform():
# Process data
pass
with DAG("etl_dag", start_date=datetime(2024, 1, 1)) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
extract_task >> transform_task
# Dagster equivalent with explicit dependencies
@asset
def extract_data():
return {"data": [1, 2, 3]}
@asset(ins={"extract_data": AssetIn()})
def transform_data(extract_data):
# Process data from extract_data
return {"processed": True}
Convert Airflow XCom passing to Dagster asset dependencies:
# Airflow with XCom
def push_to_xcom(**context):
context['ti'].xcom_push(key='result', value={'count': 100})
def pull_from_xcom(**context):
result = context['ti'].xcom_pull(key='result')
print(f"Count: {result['count']}")
# Dagster equivalent - natural data flow
@asset
def compute_count():
return {"count": 100}
@asset
def process_count(compute_count):
print(f"Count: {compute_count['count']}")
return {"processed": True}
Scheduling
Convert Airflow schedules to Dagster schedules:
from dagster import ScheduleDefinition, Definitions
# Airflow: schedule_interval="@daily"
# Dagster equivalent:
daily_schedule = ScheduleDefinition(
name="daily_etl",
cron_schedule="0 0 * * *",
target="*",
)
# Airflow: schedule_interval="0 */4 * * *"
every_four_hours = ScheduleDefinition(
name="every_four_hours",
cron_schedule="0 */4 * * *",
target="*",
)
defs = Definitions(
assets=[extract_data, transform_data],
schedules=[daily_schedule, every_four_hours],
)
Airflow Hooks Integration
Use DagsterHook to send data to Dagster from Airflow:
from dagster_airflow import DagsterHook
from airflow import DAG
from airflow.operators.python import PythonOperator
def send_to_dagster(**context):
hook = DagsterHook(
dagster_url="http://localhost:3000",
repository_name="my_repo",
)
# Trigger a Dagster run
run_id = hook.launch_run(
job_name="my_job",
run_config={"ops": {"my_op": {"config": {"param": "value"}}}},
)
# Wait for completion
hook.wait_for_run(run_id)
with DAG("airflow_to_dagster", start_date=datetime(2024, 1, 1)) as dag:
send_task = PythonOperator(
task_id="send_to_dagster",
python_callable=send_to_dagster,
)
Airflow Links
Add Dagster links to Airflow UI:
from dagster_airflow import DagsterLink
from airflow.plugins_manager import AirflowPlugin
class DagsterAirflowPlugin(AirflowPlugin):
name = "dagster_plugin"
operator_extra_links = [DagsterLink()]
Incremental Migration with Airlift
For large Airflow deployments, use dagster-airlift for gradual migration:
pip install dagster-airlift
from dagster_airlift import (
AirflowInstance,
build_airlift_metadata_mapping_info,
load_airflow_dag_asset_specs,
)
# Point to Airflow instance
airflow_instance = AirflowInstance(
webserver_url="http://localhost:8080",
auth_backend="airflow.api.auth.backend.basic_auth",
username="admin",
password="admin",
)
# Load DAG specs without executing
dag_specs = load_airflow_dag_asset_specs(
airflow_instance=airflow_instance,
dag_selector_fn=lambda dag_id: dag_id.startswith("etl_"),
)
defs = Definitions(assets=dag_specs)
With Airlift, you can:
- Observe Airflow DAGs in Dagster without code changes
- Incrementally migrate tasks one at a time
- Maintain Airflow execution while building Dagster observability
- Gradually transfer orchestration to Dagster
Best Practices
- Start with observation: Use airlift to observe Airflow DAGs before migrating
- Migrate incrementally: Convert one DAG or task at a time
- Test thoroughly: Run migrated code in parallel with Airflow initially
- Use assets over ops: Prefer Dagster assets for better dependency management
- Leverage metadata: Convert Airflow variables and connections to Dagster resources
Common Migration Challenges
Dynamic Task Generation
Airflow dynamic tasks need different approaches in Dagster:
# Airflow dynamic tasks
from airflow.decorators import task
@task
def dynamic_task_generator():
return [1, 2, 3]
@task
def process_item(item):
return item * 2
results = process_item.expand(item=dynamic_task_generator())
# Dagster equivalent - use multi-assets or graph composition
from dagster import multi_asset, AssetOut
@multi_asset(
outs={
"item_1": AssetOut(),
"item_2": AssetOut(),
"item_3": AssetOut(),
}
)
def process_items():
items = [1, 2, 3]
return tuple(item * 2 for item in items)
Sensors vs Sensors
Convert Airflow sensors to Dagster sensors:
# Airflow file sensor
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
)
# Dagster equivalent
from dagster import sensor, RunRequest, SensorEvaluationContext
import os
@sensor(job_name="process_file_job")
def file_sensor(context: SensorEvaluationContext):
filepath = "/path/to/file.csv"
if os.path.exists(filepath):
return RunRequest(run_key=filepath)
API Reference
Key functions:
load_assets_from_airflow_dag: Convert a single DAG to assets
make_dagster_definitions_from_airflow_dags_path: Load all DAGs from directory
make_dagster_definitions_from_airflow_dag_bag: Convert DagBag to Definitions
DagsterOperator: Airflow operator to trigger Dagster runs
DagsterCloudOperator: Trigger Dagster Cloud runs from Airflow
DagsterHook: Airflow hook for Dagster API interaction
For complete documentation, see dagster-airflow API reference.
Next Steps