Databricks Integration
The dagster-databricks library enables you to:
- Submit and monitor Databricks jobs from Dagster
- Execute Spark code on Databricks clusters
- Use Databricks SQL for analytics
- Integrate with Delta Lake tables
- Stream logs and metrics from Databricks to Dagster
Installation
pip install dagster-databricks
# Install with Databricks SDK
pip install dagster-databricks databricks-sdk
Quick Start
Execute a Databricks job from Dagster:
from dagster import asset, Definitions, OpExecutionContext
from dagster_databricks import DatabricksClientResource
@asset
def databricks_job(context: OpExecutionContext, databricks: DatabricksClientResource):
# Submit job to Databricks
job_run = databricks.submit_run(
run_name="dagster_pipeline_run",
tasks=[
{
"task_key": "main_task",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
"spark_python_task": {
"python_file": "dbfs:/scripts/etl_job.py",
"parameters": ["--date", "2024-01-01"],
},
}
],
)
# Wait for completion
job_run.wait_for_completion()
context.log.info(f"Databricks job completed: {job_run.run_id}")
return {"run_id": job_run.run_id}
defs = Definitions(
assets=[databricks_job],
resources={
"databricks": DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"},
)
},
)
Databricks Client Resource
The DatabricksClientResource provides access to Databricks APIs:
from dagster_databricks import DatabricksClientResource
databricks = DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"},
# Or use username/password
# username={"env": "DATABRICKS_USERNAME"},
# password={"env": "DATABRICKS_PASSWORD"},
)
Configuration options:
host: Databricks workspace URL
token: Personal access token or service principal token
username: Username (alternative to token)
password: Password (alternative to token)
cluster_id: Default cluster ID for jobs
Dagster Pipes with Databricks
Use Dagster Pipes for streaming logs and structured metadata from Databricks:
from dagster import asset, AssetExecutionContext
from dagster_databricks import PipesDatabricksClient
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
@asset
def databricks_pipes_asset(
context: AssetExecutionContext,
pipes_databricks: PipesDatabricksClient,
):
# Define task configuration
task = jobs.SubmitTask.from_dict(
{
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
"libraries": [
# Include dagster-pipes in task environment
{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "pipes-task",
"spark_python_task": {
"python_file": "dbfs:/my_python_script.py",
"source": jobs.Source.WORKSPACE,
},
}
)
# Run with Pipes - streams logs and metrics
return pipes_databricks.run(
task=task,
context=context,
extras={"date": "2024-01-01"},
).get_materialize_result()
defs = Definitions(
assets=[databricks_pipes_asset],
resources={
"pipes_databricks": PipesDatabricksClient(
client=WorkspaceClient(
host="https://my-workspace.cloud.databricks.com",
token=os.getenv("DATABRICKS_TOKEN"),
)
)
},
)
Databricks script (dbfs:/my_python_script.py):
import sys
from dagster_pipes import open_dagster_pipes
# Open Pipes session
with open_dagster_pipes() as pipes:
# Log messages back to Dagster
pipes.log.info("Starting Databricks computation")
# Access extras passed from Dagster
date = pipes.get_extra("date")
# Perform computation
result_count = 1000 # Your computation here
# Report structured metadata
pipes.report_asset_materialization(
metadata={"row_count": result_count},
)
pipes.log.info(f"Processed {result_count} rows")
Dagster Pipes provides bi-directional communication between Dagster and Databricks, streaming logs, metrics, and metadata in real-time.
Running Existing Databricks Jobs
Trigger pre-configured Databricks jobs:
from dagster import asset, OpExecutionContext
from dagster_databricks import DatabricksClientResource
@asset
def trigger_existing_job(
context: OpExecutionContext,
databricks: DatabricksClientResource,
):
# Run existing job by ID
job_id = 123456
run = databricks.run_now(
job_id=job_id,
notebook_params={"date": "2024-01-01"},
)
# Wait for completion
run.wait_for_completion()
context.log.info(f"Job {job_id} completed successfully")
return {"job_id": job_id, "run_id": run.run_id}
Creating Jobs with create_databricks_job_op
Create reusable ops for Databricks jobs:
from dagster import Definitions
from dagster_databricks import (
create_databricks_run_now_op,
create_databricks_submit_run_op,
DatabricksClientResource,
)
# Op that triggers existing job
run_existing_job = create_databricks_run_now_op(
name="run_analytics_job",
databricks_job_id=123456,
)
# Op that submits new job
submit_etl_job = create_databricks_submit_run_op(
name="submit_etl",
job_config={
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
"notebook_task": {
"notebook_path": "/Users/me/etl_notebook",
"base_parameters": {"env": "prod"},
},
},
)
defs = Definitions(
assets=[run_existing_job, submit_etl_job],
resources={
"databricks": DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"},
)
},
)
Databricks SQL
Execute SQL queries on Databricks SQL warehouses:
from dagster import asset, OpExecutionContext
from databricks import sql
@asset
def databricks_sql_query(context: OpExecutionContext):
# Connect to SQL warehouse
connection = sql.connect(
server_hostname="my-workspace.cloud.databricks.com",
http_path="/sql/1.0/warehouses/abc123",
access_token=os.getenv("DATABRICKS_TOKEN"),
)
cursor = connection.cursor()
cursor.execute("""
SELECT product, SUM(revenue) as total_revenue
FROM sales
WHERE date >= CURRENT_DATE - INTERVAL 7 DAYS
GROUP BY product
ORDER BY total_revenue DESC
LIMIT 10
""")
results = cursor.fetchall()
context.log.info(f"Query returned {len(results)} rows")
cursor.close()
connection.close()
return {"top_products": [row.asDict() for row in results]}
Delta Lake Integration
Work with Delta Lake tables:
from dagster import asset
from pyspark.sql import SparkSession
@asset
def delta_table_reader():
spark = SparkSession.builder.getOrCreate()
# Read Delta table
df = spark.read.format("delta").load(
"dbfs:/mnt/delta/sales"
)
# Perform operations
filtered_df = df.filter(df.amount > 100)
# Write to new Delta table
filtered_df.write.format("delta").mode("overwrite").save(
"dbfs:/mnt/delta/high_value_sales"
)
return {"row_count": filtered_df.count()}
Databricks Asset Bundles
Integrate Databricks Asset Bundles (DABs):
from dagster_databricks import DatabricksAssetBundleComponent
# Load assets from Databricks Asset Bundle
databricks_bundle = DatabricksAssetBundleComponent(
bundle_path="/path/to/bundle",
target="production",
)
PySpark Step Launcher
Run individual ops on Databricks clusters:
from dagster import job, op, Definitions
from dagster_databricks import DatabricksPySparkStepLauncher
@op
def spark_computation():
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("dbfs:/data/input")
result = df.count()
return result
@job(
resource_defs={
"step_launcher": DatabricksPySparkStepLauncher(
databricks_host="https://my-workspace.cloud.databricks.com",
databricks_token={"env": "DATABRICKS_TOKEN"},
cluster_id="1234-567890-abc123",
)
}
)
def databricks_pyspark_job():
spark_computation()
defs = Definitions(jobs=[databricks_pyspark_job])
Authentication
Multiple authentication methods are supported:
Personal Access Token
DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"},
)
Service Principal (OAuth)
DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
client_id={"env": "DATABRICKS_CLIENT_ID"},
client_secret={"env": "DATABRICKS_CLIENT_SECRET"},
)
Username/Password
DatabricksClientResource(
host="https://my-workspace.cloud.databricks.com",
username={"env": "DATABRICKS_USERNAME"},
password={"env": "DATABRICKS_PASSWORD"},
)
Best Practices
- Use Pipes: Prefer
PipesDatabricksClient for better observability and log streaming
- Cluster sizing: Right-size clusters for workload - use autoscaling when appropriate
- Spot instances: Use spot instances for cost savings on fault-tolerant workloads
- Delta Lake: Store outputs in Delta format for ACID transactions and time travel
- Job pooling: Reuse clusters with job pools to reduce startup time
- Secrets: Use Databricks Secrets for credential management
Monitoring and Observability
Track Databricks job metrics in Dagster:
from dagster import asset, OpExecutionContext, MetadataValue
from dagster_databricks import DatabricksClientResource
@asset
def monitored_databricks_job(
context: OpExecutionContext,
databricks: DatabricksClientResource,
):
run = databricks.submit_run(config)
run.wait_for_completion()
# Fetch job metrics
run_info = databricks.get_run(run.run_id)
# Report to Dagster
context.add_output_metadata(
{
"run_id": run.run_id,
"cluster_id": run_info.cluster_instance.cluster_id,
"execution_duration": run_info.execution_duration,
"run_page_url": MetadataValue.url(run_info.run_page_url),
}
)
return {"status": "success"}
Troubleshooting
Authentication failures
Verify token has correct permissions:
curl -H "Authorization: Bearer $DATABRICKS_TOKEN" \
https://my-workspace.cloud.databricks.com/api/2.0/clusters/list
Cluster startup timeout
Increase timeout or use existing all-purpose cluster:
DatabricksClientResource(
host="...",
token={"env": "DATABRICKS_TOKEN"},
cluster_id="existing-cluster-id", # Use running cluster
)
Library installation errors
Ensure libraries are compatible with Databricks runtime:
"libraries": [
{"pypi": {"package": "dagster-pipes==1.8.0"}}, # Pin version
]
API Reference
Key components:
DatabricksClientResource: Client for Databricks APIs
PipesDatabricksClient: Execute with Dagster Pipes
create_databricks_run_now_op: Trigger existing job
create_databricks_submit_run_op: Submit new job
DatabricksPySparkStepLauncher: Run ops on Databricks
DatabricksAssetBundleComponent: Load DABs
For complete documentation, see dagster-databricks API reference.
Next Steps