Skip to main content

Databricks Integration

The dagster-databricks library enables you to:
  • Submit and monitor Databricks jobs from Dagster
  • Execute Spark code on Databricks clusters
  • Use Databricks SQL for analytics
  • Integrate with Delta Lake tables
  • Stream logs and metrics from Databricks to Dagster

Installation

pip install dagster-databricks

# Install with Databricks SDK
pip install dagster-databricks databricks-sdk

Quick Start

Execute a Databricks job from Dagster:
from dagster import asset, Definitions, OpExecutionContext
from dagster_databricks import DatabricksClientResource

@asset
def databricks_job(context: OpExecutionContext, databricks: DatabricksClientResource):
    # Submit job to Databricks
    job_run = databricks.submit_run(
        run_name="dagster_pipeline_run",
        tasks=[
            {
                "task_key": "main_task",
                "new_cluster": {
                    "spark_version": "13.3.x-scala2.12",
                    "node_type_id": "i3.xlarge",
                    "num_workers": 2,
                },
                "spark_python_task": {
                    "python_file": "dbfs:/scripts/etl_job.py",
                    "parameters": ["--date", "2024-01-01"],
                },
            }
        ],
    )
    
    # Wait for completion
    job_run.wait_for_completion()
    
    context.log.info(f"Databricks job completed: {job_run.run_id}")
    return {"run_id": job_run.run_id}

defs = Definitions(
    assets=[databricks_job],
    resources={
        "databricks": DatabricksClientResource(
            host="https://my-workspace.cloud.databricks.com",
            token={"env": "DATABRICKS_TOKEN"},
        )
    },
)

Databricks Client Resource

The DatabricksClientResource provides access to Databricks APIs:
from dagster_databricks import DatabricksClientResource

databricks = DatabricksClientResource(
    host="https://my-workspace.cloud.databricks.com",
    token={"env": "DATABRICKS_TOKEN"},
    # Or use username/password
    # username={"env": "DATABRICKS_USERNAME"},
    # password={"env": "DATABRICKS_PASSWORD"},
)
Configuration options:
  • host: Databricks workspace URL
  • token: Personal access token or service principal token
  • username: Username (alternative to token)
  • password: Password (alternative to token)
  • cluster_id: Default cluster ID for jobs

Dagster Pipes with Databricks

Use Dagster Pipes for streaming logs and structured metadata from Databricks:
from dagster import asset, AssetExecutionContext
from dagster_databricks import PipesDatabricksClient
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs

@asset
def databricks_pipes_asset(
    context: AssetExecutionContext,
    pipes_databricks: PipesDatabricksClient,
):
    # Define task configuration
    task = jobs.SubmitTask.from_dict(
        {
            "new_cluster": {
                "spark_version": "13.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
            },
            "libraries": [
                # Include dagster-pipes in task environment
                {"pypi": {"package": "dagster-pipes"}},
            ],
            "task_key": "pipes-task",
            "spark_python_task": {
                "python_file": "dbfs:/my_python_script.py",
                "source": jobs.Source.WORKSPACE,
            },
        }
    )
    
    # Run with Pipes - streams logs and metrics
    return pipes_databricks.run(
        task=task,
        context=context,
        extras={"date": "2024-01-01"},
    ).get_materialize_result()

defs = Definitions(
    assets=[databricks_pipes_asset],
    resources={
        "pipes_databricks": PipesDatabricksClient(
            client=WorkspaceClient(
                host="https://my-workspace.cloud.databricks.com",
                token=os.getenv("DATABRICKS_TOKEN"),
            )
        )
    },
)
Databricks script (dbfs:/my_python_script.py):
import sys
from dagster_pipes import open_dagster_pipes

# Open Pipes session
with open_dagster_pipes() as pipes:
    # Log messages back to Dagster
    pipes.log.info("Starting Databricks computation")
    
    # Access extras passed from Dagster
    date = pipes.get_extra("date")
    
    # Perform computation
    result_count = 1000  # Your computation here
    
    # Report structured metadata
    pipes.report_asset_materialization(
        metadata={"row_count": result_count},
    )
    
    pipes.log.info(f"Processed {result_count} rows")
Dagster Pipes provides bi-directional communication between Dagster and Databricks, streaming logs, metrics, and metadata in real-time.

Running Existing Databricks Jobs

Trigger pre-configured Databricks jobs:
from dagster import asset, OpExecutionContext
from dagster_databricks import DatabricksClientResource

@asset
def trigger_existing_job(
    context: OpExecutionContext,
    databricks: DatabricksClientResource,
):
    # Run existing job by ID
    job_id = 123456
    
    run = databricks.run_now(
        job_id=job_id,
        notebook_params={"date": "2024-01-01"},
    )
    
    # Wait for completion
    run.wait_for_completion()
    
    context.log.info(f"Job {job_id} completed successfully")
    return {"job_id": job_id, "run_id": run.run_id}

Creating Jobs with create_databricks_job_op

Create reusable ops for Databricks jobs:
from dagster import Definitions
from dagster_databricks import (
    create_databricks_run_now_op,
    create_databricks_submit_run_op,
    DatabricksClientResource,
)

# Op that triggers existing job
run_existing_job = create_databricks_run_now_op(
    name="run_analytics_job",
    databricks_job_id=123456,
)

# Op that submits new job
submit_etl_job = create_databricks_submit_run_op(
    name="submit_etl",
    job_config={
        "new_cluster": {
            "spark_version": "13.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 2,
        },
        "notebook_task": {
            "notebook_path": "/Users/me/etl_notebook",
            "base_parameters": {"env": "prod"},
        },
    },
)

defs = Definitions(
    assets=[run_existing_job, submit_etl_job],
    resources={
        "databricks": DatabricksClientResource(
            host="https://my-workspace.cloud.databricks.com",
            token={"env": "DATABRICKS_TOKEN"},
        )
    },
)

Databricks SQL

Execute SQL queries on Databricks SQL warehouses:
from dagster import asset, OpExecutionContext
from databricks import sql

@asset
def databricks_sql_query(context: OpExecutionContext):
    # Connect to SQL warehouse
    connection = sql.connect(
        server_hostname="my-workspace.cloud.databricks.com",
        http_path="/sql/1.0/warehouses/abc123",
        access_token=os.getenv("DATABRICKS_TOKEN"),
    )
    
    cursor = connection.cursor()
    cursor.execute("""
        SELECT product, SUM(revenue) as total_revenue
        FROM sales
        WHERE date >= CURRENT_DATE - INTERVAL 7 DAYS
        GROUP BY product
        ORDER BY total_revenue DESC
        LIMIT 10
    """)
    
    results = cursor.fetchall()
    context.log.info(f"Query returned {len(results)} rows")
    
    cursor.close()
    connection.close()
    
    return {"top_products": [row.asDict() for row in results]}

Delta Lake Integration

Work with Delta Lake tables:
from dagster import asset
from pyspark.sql import SparkSession

@asset
def delta_table_reader():
    spark = SparkSession.builder.getOrCreate()
    
    # Read Delta table
    df = spark.read.format("delta").load(
        "dbfs:/mnt/delta/sales"
    )
    
    # Perform operations
    filtered_df = df.filter(df.amount > 100)
    
    # Write to new Delta table
    filtered_df.write.format("delta").mode("overwrite").save(
        "dbfs:/mnt/delta/high_value_sales"
    )
    
    return {"row_count": filtered_df.count()}

Databricks Asset Bundles

Integrate Databricks Asset Bundles (DABs):
from dagster_databricks import DatabricksAssetBundleComponent

# Load assets from Databricks Asset Bundle
databricks_bundle = DatabricksAssetBundleComponent(
    bundle_path="/path/to/bundle",
    target="production",
)

PySpark Step Launcher

Run individual ops on Databricks clusters:
from dagster import job, op, Definitions
from dagster_databricks import DatabricksPySparkStepLauncher

@op
def spark_computation():
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.parquet("dbfs:/data/input")
    
    result = df.count()
    return result

@job(
    resource_defs={
        "step_launcher": DatabricksPySparkStepLauncher(
            databricks_host="https://my-workspace.cloud.databricks.com",
            databricks_token={"env": "DATABRICKS_TOKEN"},
            cluster_id="1234-567890-abc123",
        )
    }
)
def databricks_pyspark_job():
    spark_computation()

defs = Definitions(jobs=[databricks_pyspark_job])

Authentication

Multiple authentication methods are supported:

Personal Access Token

DatabricksClientResource(
    host="https://my-workspace.cloud.databricks.com",
    token={"env": "DATABRICKS_TOKEN"},
)

Service Principal (OAuth)

DatabricksClientResource(
    host="https://my-workspace.cloud.databricks.com",
    client_id={"env": "DATABRICKS_CLIENT_ID"},
    client_secret={"env": "DATABRICKS_CLIENT_SECRET"},
)

Username/Password

DatabricksClientResource(
    host="https://my-workspace.cloud.databricks.com",
    username={"env": "DATABRICKS_USERNAME"},
    password={"env": "DATABRICKS_PASSWORD"},
)

Best Practices

  1. Use Pipes: Prefer PipesDatabricksClient for better observability and log streaming
  2. Cluster sizing: Right-size clusters for workload - use autoscaling when appropriate
  3. Spot instances: Use spot instances for cost savings on fault-tolerant workloads
  4. Delta Lake: Store outputs in Delta format for ACID transactions and time travel
  5. Job pooling: Reuse clusters with job pools to reduce startup time
  6. Secrets: Use Databricks Secrets for credential management

Monitoring and Observability

Track Databricks job metrics in Dagster:
from dagster import asset, OpExecutionContext, MetadataValue
from dagster_databricks import DatabricksClientResource

@asset
def monitored_databricks_job(
    context: OpExecutionContext,
    databricks: DatabricksClientResource,
):
    run = databricks.submit_run(config)
    run.wait_for_completion()
    
    # Fetch job metrics
    run_info = databricks.get_run(run.run_id)
    
    # Report to Dagster
    context.add_output_metadata(
        {
            "run_id": run.run_id,
            "cluster_id": run_info.cluster_instance.cluster_id,
            "execution_duration": run_info.execution_duration,
            "run_page_url": MetadataValue.url(run_info.run_page_url),
        }
    )
    
    return {"status": "success"}

Troubleshooting

Authentication failures

Verify token has correct permissions:
curl -H "Authorization: Bearer $DATABRICKS_TOKEN" \
  https://my-workspace.cloud.databricks.com/api/2.0/clusters/list

Cluster startup timeout

Increase timeout or use existing all-purpose cluster:
DatabricksClientResource(
    host="...",
    token={"env": "DATABRICKS_TOKEN"},
    cluster_id="existing-cluster-id",  # Use running cluster
)

Library installation errors

Ensure libraries are compatible with Databricks runtime:
"libraries": [
    {"pypi": {"package": "dagster-pipes==1.8.0"}},  # Pin version
]

API Reference

Key components:
  • DatabricksClientResource: Client for Databricks APIs
  • PipesDatabricksClient: Execute with Dagster Pipes
  • create_databricks_run_now_op: Trigger existing job
  • create_databricks_submit_run_op: Submit new job
  • DatabricksPySparkStepLauncher: Run ops on Databricks
  • DatabricksAssetBundleComponent: Load DABs
For complete documentation, see dagster-databricks API reference.

Next Steps

Build docs developers (and LLMs) love