Assets are the core building blocks of Dagster. They represent data produced by your pipelines - files, database tables, ML models, or any other data artifact.
Basic Asset Definition
Define a simple asset using the @asset decorator:
import pandas as pd
import dagster as dg
@dg.asset
def processed_data():
# Read data from the CSV
df = pd.read_csv("src/data/sample_data.csv")
# Add an age_group column based on the value of age
df["age_group"] = pd.cut(
df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"]
)
# Save processed data
df.to_csv("src/data/processed_data.csv", index=False)
return "Data loaded successfully"
The asset name is automatically derived from the function name. Dagster tracks when this asset was last materialized and its dependencies.
Asset Dependencies
Assets can depend on other assets, creating a data pipeline:
import json
import pandas as pd
import requests
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
@asset(group_name="hackernews", compute_kind="HackerNews API")
def topstory_ids() -> None:
"""Get up to 100 top stories from the HackerNews topstories endpoint."""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_new_story_ids = requests.get(newstories_url).json()[:100]
os.makedirs("data", exist_ok=True)
with open("data/topstory_ids.json", "w") as f:
json.dump(top_new_story_ids, f)
@asset(deps=[topstory_ids], group_name="hackernews", compute_kind="HackerNews API")
def topstories(context: AssetExecutionContext) -> MaterializeResult:
"""Get items based on story ids from the HackerNews items endpoint."""
with open("data/topstory_ids.json") as f:
topstory_ids = json.load(f)
results = []
for item_id in topstory_ids:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
df.to_csv("data/topstories.csv")
return MaterializeResult(
metadata={
"num_records": len(df),
"preview": MetadataValue.md(str(df.head().to_markdown())),
}
)
Use deps for implicit dependencies where you don’t need the upstream asset’s value. Use function parameters for explicit dependencies where you need to access the upstream data.
For more control over dependencies, use AssetIn:
from dagster import AssetIn, asset
@asset(ins={"input2": AssetIn(key_prefix="something_else")})
def asset1(input1, input2):
# input1 comes from an asset named "input1"
# input2 comes from "something_else/input2"
return input1 + input2
Add rich metadata to your assets to provide context and enable observability:
import base64
from io import BytesIO
import matplotlib.pyplot as plt
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
@asset(deps=[topstories], group_name="hackernews", compute_kind="Plot")
def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
"""Get the top 25 most frequent words in HackerNews story titles."""
stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
topstories = pd.read_csv("data/topstories.csv")
# Loop through titles and count word frequency
word_counts = {}
for raw_title in topstories["title"]:
title = raw_title.lower()
for word in title.split():
cleaned_word = word.strip(".,-!?:;()[]'\"\"")
if cleaned_word not in stopwords and len(cleaned_word) > 0:
word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1
# Get the top 25 most frequent words
top_words = {
pair[0]: pair[1]
for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
}
# Make a bar chart
plt.figure(figsize=(10, 6))
plt.bar(list(top_words.keys()), list(top_words.values()))
plt.xticks(rotation=45, ha="right")
plt.title("Top 25 Words in Hacker News Titles")
plt.tight_layout()
# Convert image to saveable format
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
# Convert to Markdown for preview
md_content = f"})"
with open("data/most_frequent_words.json", "w") as f:
json.dump(top_words, f)
# Attach metadata to the asset
return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})
Multi-Assets
Define multiple assets that are computed together:
from dagster import AssetOut, multi_asset
@multi_asset(
outs={
"a": AssetOut(),
"b": AssetOut(),
"c": AssetOut()
},
internal_asset_deps={
"a": {AssetKey("in1"), AssetKey("in2")},
"b": set(),
"c": {AssetKey("a"), AssetKey("b"), AssetKey("in2"), AssetKey("in3")}
},
can_subset=True
)
def abc_(context, in1, in2, in3):
# Compute all three assets together
a_result = compute_a(in1, in2)
b_result = compute_b()
c_result = compute_c(a_result, b_result, in2, in3)
return a_result, b_result, c_result
Partitioned Assets
Partition assets to process data incrementally:
import datetime
import os
import pandas as pd
import dagster as dg
# Create the PartitionDefinition
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame({
"date": [date] * 10,
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
})
os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)
context.log.info(f"Daily sales data written to {filename}")
@dg.asset(
partitions_def=daily_partitions,
deps=[daily_sales_data]
)
def daily_sales_summary(context):
partition_date_str = context.partition_key
filename = f"data/daily_sales/sales_{partition_date_str}.csv"
df = pd.read_csv(filename)
summary = {
"date": partition_date_str,
"total_sales": df["sales"].sum(),
}
context.log.info(f"Daily sales summary for {partition_date_str}: {summary}")
Define the partition scheme
Choose from DailyPartitionsDefinition, WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or create a custom StaticPartitionsDefinition.
Apply partitions to assets
Add the partitions_def parameter to your asset decorators.
Use context.partition_key to get the current partition being processed.
Configurable Resources
Use resources to configure external connections and share them across assets:
import requests
import dagster as dg
class SunResource(dg.ConfigurableResource):
latitude: str
longitude: str
time_zone: str
@property
def query_string(self) -> str:
return f"https://api.sunrise-sunset.org/json?lat={self.latitude}&lng={self.longitude}&date=today&tzid={self.time_zone}"
def sunrise(self) -> str:
data = requests.get(self.query_string, timeout=5).json()
return data["results"]["sunrise"]
@dg.asset
def sfo_sunrise(context: dg.AssetExecutionContext, sun_resource: SunResource) -> None:
sunrise = sun_resource.sunrise()
context.log.info(f"Sunrise in San Francisco is at {sunrise}.")
defs = dg.Definitions(
assets=[sfo_sunrise],
resources={
"sun_resource": SunResource(
latitude="37.615223",
longitude="-122.389977",
time_zone="America/Los_Angeles",
)
},
)
Integration Assets
Leverage pre-built integrations for popular data tools:
from dagster_airbyte import build_airbyte_assets
from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext
# Airbyte assets
airbyte_assets = build_airbyte_assets(
connection_id="your-connection-id",
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)
# dbt assets
@dbt_assets(manifest="/path/to/manifest.json")
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
When using integration assets, ensure you have the corresponding library installed (e.g., dagster-dbt, dagster-airbyte) and properly configured resources.
Best Practices
Organize with Groups
Group related assets together for better organization in the UI:
@asset(group_name="analytics")
def revenue_metrics():
pass
@asset(group_name="analytics")
def user_metrics():
pass
Use Compute Kinds
Label assets with their computation type for clarity:
@asset(compute_kind="dbt")
def dbt_model():
pass
@asset(compute_kind="Python")
def python_transform():
pass
Always return useful metadata to make assets observable:
@asset
def my_asset() -> MaterializeResult:
df = compute_data()
return MaterializeResult(
metadata={
"num_rows": len(df),
"columns": MetadataValue.md(", ".join(df.columns)),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
Next Steps