Skip to main content

Partition Types

Partitions allow you to organize assets and jobs by time windows, dimensions, or custom criteria. Dagster provides several partition definition types for different use cases.

PartitionsDefinition

Abstract base class for all partition definitions. Defines a set of partitions that can be attached to an asset or job.

Common Methods

get_partition_keys
method
Returns a list of partition keys.Parameters:
  • current_time (datetime, optional): Current time for time-based partitions
  • dynamic_partitions_store (DynamicPartitionsStore, optional): Store for dynamic partitions
Returns: Sequence[str] - List of partition keys
partition_keys = partitions_def.get_partition_keys()
# Returns: ["2024-01-01", "2024-01-02", "2024-01-03", ...]
get_partition_keys_in_range
method
Returns partition keys within a specified range.Parameters:
  • partition_key_range (PartitionKeyRange): The range to query
Returns: Sequence[str] - Partition keys in the range
has_partition_key
method
Checks if a partition key exists.Parameters:
  • partition_key (str): The partition key to check
Returns: bool - True if the partition exists
get_num_partitions
method
Returns the total number of partitions.Returns: int - Number of partitions

StaticPartitionsDefinition

A statically-defined set of partitions with explicit partition keys.
from dagster import StaticPartitionsDefinition, asset

regions = StaticPartitionsDefinition(["us-east", "us-west", "eu-central", "ap-south"])

@asset(partitions_def=regions)
def regional_data(context):
    region = context.partition_key
    return fetch_data_for_region(region)

Constructor

partition_keys
Sequence[str]
required
List of partition key strings. Recommended to limit to 100,000 partitions or fewer per asset.

Example: Country Partitions

countries = StaticPartitionsDefinition(
    ["USA", "Canada", "Mexico", "UK", "Germany", "France"]
)

@asset(partitions_def=countries)
def country_metrics(context):
    country = context.partition_key
    return calculate_metrics(country)

TimeWindowPartitionsDefinition

Partitions based on time windows with customizable cron schedules.
from dagster import TimeWindowPartitionsDefinition, asset
from datetime import datetime

hourly_partitions = TimeWindowPartitionsDefinition(
    start=datetime(2024, 1, 1),
    cron_schedule="0 * * * *",  # Every hour
    fmt="%Y-%m-%d-%H"
)

@asset(partitions_def=hourly_partitions)
def hourly_metrics(context):
    time_window = context.partition_time_window
    return compute_metrics(time_window.start, time_window.end)

Constructor

start
datetime | str
required
The start datetime or string for the partition set. The first partition starts at the first cron schedule tick equal to or after this value.
cron_schedule
str
required
Cron schedule string that determines the partition boundaries (e.g., “0 0 * * *” for daily at midnight).
fmt
str
required
Date format string for partition keys (e.g., “%Y-%m-%d”).
timezone
str
Timezone string (IANA format, e.g., “America/Los_Angeles”). Defaults to “UTC”.
end
datetime | str
Optional end datetime. The last partition ends before this value.
end_offset
int
Number of partitions to extend beyond the current time. Default is 0 (last partition ends before current time).
exclusions
Sequence[str | datetime]
Cron strings or datetime objects to exclude from the partition set.

Properties

start
datetime
The start datetime of the partition set.
end
datetime | None
The end datetime of the partition set, if specified.
timezone
str
The timezone string for the partition set.
schedule_type
ScheduleType | None
Enum representing the partition cadence (HOURLY, DAILY, WEEKLY, or MONTHLY), if it matches a standard pattern.
minute_offset
int
Number of minutes past the hour when partitions start.
hour_offset
int
Number of hours past midnight when partitions start.
day_offset
int
For weekly/monthly partitions, the day when partitions start.

Time Window Methods

time_window_for_partition_key
method
Gets the time window for a specific partition key.Parameters:
  • partition_key (str): The partition key
Returns: TimeWindow - The time window
window = partitions_def.time_window_for_partition_key("2024-01-01")
# window.start = datetime(2024, 1, 1, 0, 0, 0)
# window.end = datetime(2024, 1, 2, 0, 0, 0)
get_partition_keys_in_time_window
method
Gets all partition keys within a time window.Parameters:
  • time_window (TimeWindow): The time window
Returns: Sequence[str] - Partition keys in the window

Example: Custom Schedule

# Every 6 hours starting at 3am
partitions = TimeWindowPartitionsDefinition(
    start="2024-01-01",
    cron_schedule="0 3,9,15,21 * * *",
    fmt="%Y-%m-%d-%H:%M",
    timezone="America/New_York"
)

DailyPartitionsDefinition

Convenience subclass for daily partitions.
from dagster import DailyPartitionsDefinition, asset

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partitions)
def daily_report(context):
    date = context.partition_key  # "2024-01-01" format
    return generate_report(date)

Constructor

start_date
str | datetime
required
Start date in “YYYY-MM-DD” format or as a datetime object.
end_date
str | datetime
Optional end date.
timezone
str
Timezone string. Defaults to “UTC”.
end_offset
int
Number of partitions to extend beyond current time. Default is 0.
fmt
str
Date format string. Defaults to “%Y-%m-%d”.

HourlyPartitionsDefinition

Convenience subclass for hourly partitions.
from dagster import HourlyPartitionsDefinition, asset

hourly_partitions = HourlyPartitionsDefinition(start_date="2024-01-01-00:00")

@asset(partitions_def=hourly_partitions)
def hourly_metrics(context):
    hour = context.partition_key  # "2024-01-01-00:00" format
    return compute_hourly_metrics(hour)

Constructor

start_date
str | datetime
required
Start datetime in “YYYY-MM-DD-HH:MM” format or as a datetime object.
end_date
str | datetime
Optional end datetime.
timezone
str
Timezone string. Defaults to “UTC”.
end_offset
int
Number of partitions to extend beyond current time. Default is 0.
fmt
str
Datetime format string. Defaults to “%Y-%m-%d-%H:%M”.

WeeklyPartitionsDefinition

Convenience subclass for weekly partitions.
from dagster import WeeklyPartitionsDefinition, asset

weekly_partitions = WeeklyPartitionsDefinition(
    start_date="2024-01-01",
    day_offset=1  # Start weeks on Monday (0=Sunday, 1=Monday, ...)
)

@asset(partitions_def=weekly_partitions)
def weekly_summary(context):
    week = context.partition_key
    return summarize_week(week)

Constructor

start_date
str | datetime
required
Start date.
end_date
str | datetime
Optional end date.
timezone
str
Timezone string. Defaults to “UTC”.
day_offset
int
Day of week to start partitions (0=Sunday, 1=Monday, …, 6=Saturday). Defaults to 0.
end_offset
int
Number of partitions to extend beyond current time. Default is 0.
fmt
str
Date format string. Defaults to “%Y-%m-%d”.

MonthlyPartitionsDefinition

Convenience subclass for monthly partitions.
from dagster import MonthlyPartitionsDefinition, asset

monthly_partitions = MonthlyPartitionsDefinition(
    start_date="2024-01-01",
    day_offset=1  # Start months on the 1st day
)

@asset(partitions_def=monthly_partitions)
def monthly_aggregates(context):
    month = context.partition_key
    return aggregate_month(month)

Constructor

start_date
str | datetime
required
Start date.
end_date
str | datetime
Optional end date.
timezone
str
Timezone string. Defaults to “UTC”.
day_offset
int
Day of month to start partitions (1-31). Defaults to 1.
end_offset
int
Number of partitions to extend beyond current time. Default is 0.
fmt
str
Date format string. Defaults to “%Y-%m-%d”.

DynamicPartitionsDefinition

Partitions whose keys can be dynamically added and removed at runtime.
from dagster import DynamicPartitionsDefinition, asset, sensor, SensorResult, RunRequest

fruits = DynamicPartitionsDefinition(name="fruits")

@asset(partitions_def=fruits)
def fruit_data(context):
    fruit = context.partition_key
    return fetch_fruit_data(fruit)

@sensor(job=my_job)
def new_fruit_sensor(context):
    new_fruits = check_for_new_fruits()
    if new_fruits:
        return SensorResult(
            run_requests=[RunRequest(partition_key=fruit) for fruit in new_fruits],
            dynamic_partitions_requests=[fruits.build_add_request(new_fruits)]
        )

Constructor

name
str
required
Name of the dynamic partitions definition. Must be unique.

Methods

build_add_request
method
Creates a request to add new partitions.Parameters:
  • partition_keys (Sequence[str]): Partition keys to add
Returns: AddDynamicPartitionsRequest
add_request = fruits.build_add_request(["apple", "banana"])
build_delete_request
method
Creates a request to delete partitions.Parameters:
  • partition_keys (Sequence[str]): Partition keys to delete
Returns: DeleteDynamicPartitionsRequest
delete_request = fruits.build_delete_request(["old_fruit"])

Usage with Instance

from dagster import DagsterInstance

instance = DagsterInstance.get()

# Add partitions
instance.add_dynamic_partitions(
    partitions_def_name="fruits",
    partition_keys=["apple", "banana", "cherry"]
)

# Get partitions
partition_keys = instance.get_dynamic_partitions("fruits")

# Delete partitions  
instance.delete_dynamic_partition(
    partitions_def_name="fruits",
    partition_key="cherry"
)

MultiPartitionsDefinition

Creates a cross-product of partitions from two partitions definitions.
from dagster import MultiPartitionsDefinition, DailyPartitionsDefinition, StaticPartitionsDefinition, asset

partitions = MultiPartitionsDefinition({
    "date": DailyPartitionsDefinition(start_date="2024-01-01"),
    "region": StaticPartitionsDefinition(["us-east", "us-west", "eu"])
})

@asset(partitions_def=partitions)
def regional_daily_metrics(context):
    partition_key = context.partition_key  # MultiPartitionKey object
    date = partition_key.keys_by_dimension["date"]     # "2024-01-01"
    region = partition_key.keys_by_dimension["region"] # "us-east"
    return compute_metrics(date, region)

Constructor

partitions_defs
Mapping[str, PartitionsDefinition]
required
Mapping of dimension name to partitions definition. Currently supports exactly 2 dimensions. Supported types are StaticPartitionsDefinition, TimeWindowPartitionsDefinition, and DynamicPartitionsDefinition.

MultiPartitionKey

Represents a key in a multi-dimensional partition space.
keys_by_dimension
Mapping[str, str]
Mapping from dimension name to partition key.
# Accessing multi-partition key components
date = context.partition_key.keys_by_dimension["date"]
region = context.partition_key.keys_by_dimension["region"]

Example: Time and Category

from dagster import MultiPartitionsDefinition, HourlyPartitionsDefinition, StaticPartitionsDefinition

partitions = MultiPartitionsDefinition({
    "hour": HourlyPartitionsDefinition(start_date="2024-01-01-00:00"),
    "category": StaticPartitionsDefinition(["electronics", "clothing", "food"])
})

@asset(partitions_def=partitions)
def hourly_category_sales(context):
    hour = context.partition_key.keys_by_dimension["hour"]
    category = context.partition_key.keys_by_dimension["category"]
    return calculate_sales(hour, category)

PartitionKeyRange

Represents a range of partition keys.
from dagster import PartitionKeyRange

key_range = PartitionKeyRange(start="2024-01-01", end="2024-01-07")
partition_keys = partitions_def.get_partition_keys_in_range(key_range)
start
str
required
The starting partition key (inclusive).
end
str
required
The ending partition key (inclusive).

TimeWindow

Represents a time window for time-based partitions.
start
datetime
The start datetime of the window.
end
datetime
The end datetime of the window.
@asset(partitions_def=daily_partitions)
def my_asset(context):
    window = context.partition_time_window
    start = window.start  # datetime object
    end = window.end      # datetime object
    return process_data_in_window(start, end)

Best Practices

Partition Count Limits

We recommend limiting partition counts to 100,000 or fewer per asset for optimal performance.

Time-based Partitions

# Good: Clear, consistent time boundaries
DailyPartitionsDefinition(
    start_date="2024-01-01",
    timezone="America/New_York"
)

# Good: Custom schedule with explicit format
TimeWindowPartitionsDefinition(
    start="2024-01-01",
    cron_schedule="0 0 * * *",
    fmt="%Y-%m-%d",
    timezone="UTC"
)

Static Partitions

# Good: Explicit list of meaningful partitions
StaticPartitionsDefinition(["bronze", "silver", "gold", "platinum"])

# Avoid: Too many static partitions (use dynamic instead)
# StaticPartitionsDefinition([str(i) for i in range(100000)])

Dynamic Partitions

# Good: Named dynamic partitions with controlled addition
users_partitions = DynamicPartitionsDefinition(name="users")

@sensor(job=process_user_job)
def new_users_sensor(context):
    new_users = get_new_user_ids()
    if new_users:
        return SensorResult(
            dynamic_partitions_requests=[
                users_partitions.build_add_request(new_users)
            ]
        )

See Also

Build docs developers (and LLMs) love