Skip to main content

AssetExecutionContext

The context object that is available as the first argument to asset compute functions. This context provides system information such as resources, configuration, logging, and partition information.

Class Definition

class AssetExecutionContext:
    def __init__(self, op_execution_context: OpExecutionContext) -> None
Defined in: dagster._core.execution.context.asset_execution_context

Getting the Context

@staticmethod
def get() -> AssetExecutionContext
Retrieve the current AssetExecutionContext from the execution scope. Returns: The current AssetExecutionContext Raises: DagsterInvariantViolationError if no context is in scope
from dagster import asset, AssetExecutionContext

@asset
def my_asset():
    # Inside the asset, you can get the context
    context = AssetExecutionContext.get()
    context.log.info("Retrieved context")

Core Properties

log
DagsterLogManager
The log manager for this execution. Logs are viewable in the Dagster UI.
@asset
def logger_asset(context: AssetExecutionContext):
    context.log.info("Info level message")
    context.log.warning("Warning message")
    context.log.error("Error message")
run
DagsterRun
The DagsterRun object for the current execution. Access run metadata including run_id, run_config, and tags.
@asset
def my_asset(context: AssetExecutionContext):
    run_id = context.run.run_id
    tags = context.run.tags
    config = context.run.run_config
instance
DagsterInstance
The current Dagster instance.
@asset
def my_asset(context: AssetExecutionContext):
    # Access instance-level configuration
    storage = context.instance.storage_directory()
resources
Any
The resources available in the execution context.
@asset
def my_asset(context: AssetExecutionContext):
    # Access configured resources
    db = context.resources.database
    api_client = context.resources.api
pdb
ForkedPdb
Provides access to pdb debugging from within the asset.
@asset
def debug_asset(context: AssetExecutionContext):
    context.pdb.set_trace()  # Enter debugger

Job and Op Information

job_def
JobDefinition
The definition for the currently executing job.
job_name
str
The name of the currently executing job.
op_def
OpDefinition
The current op definition.
op_execution_context
OpExecutionContext
Access to the underlying OpExecutionContext for advanced use cases.
retry_number
int
Which retry attempt is currently executing (0 for initial attempt, 1 for first retry, etc.).
repository_def
RepositoryDefinition
The Dagster repository for the currently executing job.

Asset Information

asset_key
AssetKey
The AssetKey for the current asset. For multi-assets with multiple outputs, use asset_key_for_output() instead.
@asset
def my_asset(context: AssetExecutionContext):
    key = context.asset_key  # AssetKey(["my_asset"])
has_assets_def
bool
Whether there is a backing AssetsDefinition for what is currently executing.
assets_def
AssetsDefinition
The backing AssetsDefinition for what is currently executing.Raises: DagsterInvalidPropertyError if not available
selected_asset_keys
AbstractSet[AssetKey]
The set of AssetKeys this execution is expected to materialize.
selected_asset_check_keys
AbstractSet[AssetCheckKey]
The asset check keys corresponding to the current selection of assets.

Methods

asset_key_for_output

def asset_key_for_output(self, output_name: str = "result") -> AssetKey
Return the AssetKey for the corresponding output.
output_name
str
default:"result"
The name of the output. For @asset, this is automatically “result”. For @multi_asset, specify the output name from AssetOut.
Returns: AssetKey
from dagster import AssetOut, multi_asset

@multi_asset(
    outs={
        "asset1": AssetOut(key=["my_assets", "asset1"]),
        "asset2": AssetOut(key=["my_assets", "asset2"]),
    }
)
def my_multi_asset(context):
    key1 = context.asset_key_for_output("asset1")
    key2 = context.asset_key_for_output("asset2")
    return {"asset1": data1, "asset2": data2}

output_for_asset_key

def output_for_asset_key(self, asset_key: AssetKey) -> str
Return the output name for the corresponding asset key.
asset_key
AssetKey
The asset key to look up.
Returns: str - The output name

asset_key_for_input

def asset_key_for_input(self, input_name: str) -> AssetKey
Return the AssetKey for the corresponding input.
input_name
str
The name of the input.
Returns: AssetKey

Partition Information

has_partition_key
bool
Whether the current run targets a single partition.
partition_key
str
The partition key for the current run.Raises: Error if not a partitioned run or operating over a range of partitions.
from dagster import asset, DailyPartitionsDefinition

partitions_def = DailyPartitionsDefinition("2023-08-20")

@asset(partitions_def=partitions_def)
def my_asset(context: AssetExecutionContext):
    context.log.info(context.partition_key)
    # Materializing 2023-08-21 partition logs: "2023-08-21"
partition_keys
Sequence[str]
List of partition keys for the current run. Useful for backfills of multiple partitions.
@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
    context.log.info(context.partition_keys)
    # Backfill of 2023-08-21 through 2023-08-25 logs:
    # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
has_partition_key_range
bool
Whether the current run targets a range of partitions.
partition_key_range
PartitionKeyRange
The range of partition keys for the current run. Returns a range with the same start and end for single partition runs.
@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
    key_range = context.partition_key_range
    # Backfill logs: PartitionKeyRange(start="2023-08-21", end="2023-08-25")
partition_time_window
TimeWindow
The partition time window for the current run.Raises: Error if not partitioned or not using TimeWindowPartitionsDefinition.
@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
    window = context.partition_time_window
    # Materializing 2023-08-21 logs: TimeWindow("2023-08-21", "2023-08-22")

Partition Methods for Inputs

def asset_partition_key_for_input(self, input_name: str) -> str
def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]
def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange
def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow
def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition
These methods provide partition information for input assets.
from dagster import asset, AssetIn, DailyPartitionsDefinition, TimeWindowPartitionMapping

partitions_def = DailyPartitionsDefinition("2023-08-20")

@asset(partitions_def=partitions_def)
def upstream_asset():
    ...

@asset(
    partitions_def=partitions_def,
    ins={
        "upstream_asset": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
        )
    }
)
def downstream_asset(context: AssetExecutionContext, upstream_asset):
    # Get partition key of the upstream asset
    upstream_key = context.asset_partition_key_for_input("upstream_asset")
    # Materializing 2023-08-21 partition, upstream_key = "2023-08-20"

Metadata Management

add_output_metadata

def add_output_metadata(
    self,
    metadata: Mapping[str, Any],
    output_name: str | None = None,
    mapping_key: str | None = None,
) -> None
Add metadata to an output. Can be invoked multiple times; last value for duplicate keys wins.
metadata
Mapping[str, Any]
required
The metadata to attach to the output.
output_name
str | None
The name of the output. Not needed for single-output ops.
mapping_key
str | None
The mapping key for dynamic outputs.
from dagster import asset, Out

@asset
def my_asset(context):
    context.add_output_metadata({
        "row_count": 100,
        "preview": "data preview"
    })
    return data

@asset(out={"a": Out(), "b": Out()})
def multi_output_asset(context):
    context.add_output_metadata({"foo": "bar"}, output_name="a")
    context.add_output_metadata({"baz": "bat"}, output_name="b")
    return ("data_a", "data_b")

add_asset_metadata

def add_asset_metadata(
    self,
    metadata: Mapping[str, Any],
    asset_key: CoercibleToAssetKey | None = None,
    partition_key: str | None = None,
) -> None
Add metadata to an asset materialization event. Visible in the Dagster UI.
metadata
Mapping[str, Any]
required
The metadata to add to the asset materialization.
asset_key
CoercibleToAssetKey | None
The asset key. Not needed if only one asset is being materialized.
partition_key
str | None
The partition key for partitioned assets. If not provided on a partitioned asset, metadata is added to all partitions being materialized.
import dagster as dg

@dg.asset
def my_asset(context):
    context.add_asset_metadata({"row_count": 1000})
    return data

@dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"]))
def partitioned_asset(context):
    # Add to all partitions
    context.add_asset_metadata({"source": "api"})
    
    # Add to specific partition
    for partition_key in context.partition_keys:
        if partition_key == "a":
            context.add_asset_metadata(
                {"special": "value"}, 
                partition_key=partition_key
            )

@dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")])
def my_multi_asset(context):
    context.add_asset_metadata({"key": "value"}, asset_key="asset1")
    # Must specify asset_key in multi-assets

get_output_metadata

def get_output_metadata(
    self,
    output_name: str,
    mapping_key: str | None = None,
) -> Mapping[str, Any] | None
Retrieve metadata that has been set for a specific output.

Event Logging

log_event

def log_event(self, event: UserEvent) -> None
Log an AssetMaterialization, AssetObservation, or ExpectationResult. Events appear in the event log and DagsterEvents list.
event
AssetMaterialization | AssetObservation | ExpectationResult
required
The event to log.
from dagster import asset, AssetMaterialization

@asset
def my_asset(context):
    context.log_event(AssetMaterialization(
        asset_key="external_asset",
        description="Materialized external asset"
    ))
    return data

Data Lineage

get_asset_provenance

def get_asset_provenance(self, asset_key: AssetKey) -> DataProvenance | None
Return provenance information for the most recent materialization of an asset.
asset_key
AssetKey
required
The asset key to retrieve provenance for.
Returns: DataProvenance | None - Provenance information, or None if never materialized or record is too old.
@asset
def downstream_asset(context: AssetExecutionContext, upstream_asset):
    provenance = context.get_asset_provenance(AssetKey(["upstream_asset"]))
    if provenance:
        context.log.info(f"Data version: {provenance.data_version}")

Debugging

describe_op

def describe_op(self) -> str
Return a string description of the currently executing op.

get_mapping_key

def get_mapping_key(self) -> str | None
Return the mapping key if downstream of a DynamicOutput, otherwise None.
  • materialize - Execute assets with this context
  • Output - Define op outputs
  • @asset - Define software-defined assets

Build docs developers (and LLMs) love