The @op decorator is used to define an operation (op) - the basic unit of computation in Dagster. Ops are combined into graphs and jobs.
Signature
@op(
name: Optional[str] = None,
description: Optional[str] = None,
ins: Optional[Mapping[str, In]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
tags: Optional[Mapping[str, Any]] = None,
code_version: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
pool: Optional[str] = None,
) -> OpDefinition
Parameters
Name of the op. Must be unique within any GraphDefinition using the op. If not provided, defaults to the function name.
Human-readable description of this op. If not provided and the decorated function has a docstring, that docstring will be used as the description.
Information about the inputs to the op. Information provided here will be combined with what can be inferred from the function signature.
out
Optional[Union[Out, Dict[str, Out]]]
Information about the op outputs. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield.
The schema for the config. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided.
Set of resource handles required by this op.
Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be JSON encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
Version of the logic encapsulated by the op. If set, this is used as a default version for all outputs.
The retry policy for this op.
A string that identifies the concurrency pool that governs this op’s execution.
Returns
Type: OpDefinition
An op definition object.
Examples
Basic Op
from dagster import op
@op
def hello_world():
print('hello')
Op with Type Hints
from dagster import op
@op
def add_one(x: int) -> int:
return x + 1
from dagster import op, In, Out
@op(
ins={'msg': In(str)},
out=Out(str)
)
def echo(msg):
return msg
Op with Multiple Outputs
from dagster import op, Out
from typing import Tuple
@op(
out={'word': Out(), 'num': Out()}
)
def multi_out() -> Tuple[str, int]:
return 'cool', 4
Op with Context
from dagster import op, OpExecutionContext
@op
def context_op(context: OpExecutionContext, x: int) -> int:
context.log.info(f"Processing value: {x}")
return x * 2
Op with Configuration
from dagster import op, Config
from pydantic import Field
class MyOpConfig(Config):
multiplier: int = Field(default=1, description="Value to multiply by")
@op
def configurable_op(config: MyOpConfig, x: int) -> int:
return x * config.multiplier
Op with Resources
from dagster import op, OpExecutionContext
@op(required_resource_keys={"database"})
def database_op(context: OpExecutionContext):
# Access the database resource
db = context.resources.database
return db.query("SELECT * FROM users")
Op with Retry Policy
from dagster import op, RetryPolicy, Backoff
@op(
retry_policy=RetryPolicy(
max_retries=3,
delay=1,
backoff=Backoff.EXPONENTIAL,
)
)
def flaky_operation():
# This will retry up to 3 times with exponential backoff
result = call_external_api()
return result
Async Op
from dagster import op
import asyncio
@op
async def async_op() -> str:
await asyncio.sleep(1)
return "done"
Op Yielding Events
from dagster import op, Output, AssetMaterialization
@op
def event_op():
# Yield metadata about execution
yield AssetMaterialization(
asset_key="my_dataset",
description="Materialized the dataset"
)
# Yield the actual output
yield Output(42)
- @graph - Compose ops into reusable graphs
- @job - Create executable jobs from ops
- Output - Define op outputs
- Resources - Configure external services