Skip to main content
The @op decorator is used to define an operation (op) - the basic unit of computation in Dagster. Ops are combined into graphs and jobs.

Signature

@op(
    name: Optional[str] = None,
    description: Optional[str] = None,
    ins: Optional[Mapping[str, In]] = None,
    out: Optional[Union[Out, Mapping[str, Out]]] = None,
    config_schema: Optional[UserConfigSchema] = None,
    required_resource_keys: Optional[AbstractSet[str]] = None,
    tags: Optional[Mapping[str, Any]] = None,
    code_version: Optional[str] = None,
    retry_policy: Optional[RetryPolicy] = None,
    pool: Optional[str] = None,
) -> OpDefinition

Parameters

name
Optional[str]
Name of the op. Must be unique within any GraphDefinition using the op. If not provided, defaults to the function name.
description
Optional[str]
Human-readable description of this op. If not provided and the decorated function has a docstring, that docstring will be used as the description.
ins
Optional[Dict[str, In]]
Information about the inputs to the op. Information provided here will be combined with what can be inferred from the function signature.
out
Optional[Union[Out, Dict[str, Out]]]
Information about the op outputs. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield.
config_schema
Optional[ConfigSchema]
The schema for the config. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided.
required_resource_keys
Optional[Set[str]]
Set of resource handles required by this op.
tags
Optional[Dict[str, Any]]
Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be JSON encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
code_version
Optional[str]
Version of the logic encapsulated by the op. If set, this is used as a default version for all outputs.
retry_policy
Optional[RetryPolicy]
The retry policy for this op.
pool
Optional[str]
A string that identifies the concurrency pool that governs this op’s execution.

Returns

Type: OpDefinition An op definition object.

Examples

Basic Op

from dagster import op

@op
def hello_world():
    print('hello')

Op with Type Hints

from dagster import op

@op
def add_one(x: int) -> int:
    return x + 1

Op with Explicit Inputs and Outputs

from dagster import op, In, Out

@op(
    ins={'msg': In(str)},
    out=Out(str)
)
def echo(msg):
    return msg

Op with Multiple Outputs

from dagster import op, Out
from typing import Tuple

@op(
    out={'word': Out(), 'num': Out()}
)
def multi_out() -> Tuple[str, int]:
    return 'cool', 4

Op with Context

from dagster import op, OpExecutionContext

@op
def context_op(context: OpExecutionContext, x: int) -> int:
    context.log.info(f"Processing value: {x}")
    return x * 2

Op with Configuration

from dagster import op, Config
from pydantic import Field

class MyOpConfig(Config):
    multiplier: int = Field(default=1, description="Value to multiply by")

@op
def configurable_op(config: MyOpConfig, x: int) -> int:
    return x * config.multiplier

Op with Resources

from dagster import op, OpExecutionContext

@op(required_resource_keys={"database"})
def database_op(context: OpExecutionContext):
    # Access the database resource
    db = context.resources.database
    return db.query("SELECT * FROM users")

Op with Retry Policy

from dagster import op, RetryPolicy, Backoff

@op(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=1,
        backoff=Backoff.EXPONENTIAL,
    )
)
def flaky_operation():
    # This will retry up to 3 times with exponential backoff
    result = call_external_api()
    return result

Async Op

from dagster import op
import asyncio

@op
async def async_op() -> str:
    await asyncio.sleep(1)
    return "done"

Op Yielding Events

from dagster import op, Output, AssetMaterialization

@op
def event_op():
    # Yield metadata about execution
    yield AssetMaterialization(
        asset_key="my_dataset",
        description="Materialized the dataset"
    )
    # Yield the actual output
    yield Output(42)
  • @graph - Compose ops into reusable graphs
  • @job - Create executable jobs from ops
  • Output - Define op outputs
  • Resources - Configure external services

Build docs developers (and LLMs) love