Skip to main content

Ops, Jobs & Graphs

While assets are the recommended way to build data pipelines in Dagster, ops, jobs, and graphs provide lower-level primitives for task-based workflows. These are useful when you need fine-grained control over execution or are working with imperative, task-oriented logic.

Overview

  • Op: A single unit of computation (formerly called “solid”)
  • Job: An executable set of ops with defined execution semantics
  • Graph: A reusable composition of ops that can be converted to jobs
For most data pipelines, we recommend using assets instead of ops and jobs. Assets provide better observability, automatic lineage tracking, and a more intuitive mental model centered on data artifacts.

Ops

An op is a function that performs a discrete unit of work. Define ops using the @op decorator:
import dagster as dg

@dg.op
def return_five():
    return 5

@dg.op
def add_one(arg):
    return arg + 1

Op Configuration

Ops accept various configuration parameters:
from dagster import op, In, Out

@op(
    ins={"x": In(dagster_type=int)},
    out=Out(dagster_type=int),
)
def multiply_by_two(x):
    return x * 2

Op Context

Access runtime information using the context:
from dagster import op, OpExecutionContext

@op
def log_details(context: OpExecutionContext):
    context.log.info("Running op")
    context.log.info(f"Run ID: {context.run_id}")
    context.log.info(f"Op config: {context.op_config}")

Jobs

A job defines an executable graph of ops. Create jobs using the @job decorator:
import dagster as dg

@dg.op
def return_five():
    return 5

@dg.op
def add_one(arg):
    return arg + 1

@dg.job
def do_stuff():
    add_one(return_five())
When you decorate a function with @job, Dagster:
  1. Analyzes the function body to extract op invocations
  2. Builds a dependency graph based on data flow
  3. Creates an executable job definition

Job Configuration

Jobs accept configuration for resources, execution, and more:
from dagster import job, op, Config

class ProcessConfig(Config):
    batch_size: int

@op
def process_data(config: ProcessConfig):
    return process_batch(config.batch_size)

@job(
    tags={"environment": "production"},
    resource_defs={"database": database_resource},
    config={
        "ops": {
            "process_data": {"config": {"batch_size": 100}}
        }
    },
)
def production_job():
    process_data()

Executing Jobs

from dagster import materialize

# Execute a job directly
result = do_stuff.execute_in_process()

# Check if successful
assert result.success

# Access output values
output_value = result.output_for_node("add_one")

Graphs

A graph is a reusable composition of ops that can be converted into multiple jobs with different configurations:
import dagster as dg

class Server(dg.ConfigurableResource):
    def ping_server(self): ...

@dg.op
def interact_with_server(server: Server):
    server.ping_server()

@dg.graph
def do_stuff():
    interact_with_server()

# Create multiple jobs from the same graph
prod_server = dg.ResourceDefinition.mock_resource()
local_server = dg.ResourceDefinition.mock_resource()

prod_job = do_stuff.to_job(
    resource_defs={"server": prod_server},
    name="do_stuff_prod"
)

local_job = do_stuff.to_job(
    resource_defs={"server": local_server},
    name="do_stuff_local"
)
Graphs are useful when you want to:
  • Reuse the same computation logic across environments (dev, staging, prod)
  • Test the same logic with different resource implementations
  • Create multiple variants of a pipeline with different configurations

Nested Graphs

Graphs can contain other graphs, allowing you to build modular compositions:
from dagster import op, graph

@op
def load_data():
    return [1, 2, 3]

@op
def transform(data):
    return [x * 2 for x in data]

@graph
def etl_subgraph():
    return transform(load_data())

@op
def save_data(data):
    write_to_disk(data)

@graph
def full_pipeline():
    result = etl_subgraph()
    save_data(result)

Op Dependencies

Dagster infers dependencies from the data flow between ops:
from dagster import job, op

@op
def fetch_users():
    return [{"id": 1, "name": "Alice"}]

@op
def fetch_orders():
    return [{"user_id": 1, "amount": 100}]

@op
def join_data(users, orders):
    # This op depends on both fetch_users and fetch_orders
    return merge(users, orders)

@job
def data_pipeline():
    users = fetch_users()
    orders = fetch_orders()
    join_data(users, orders)

Fan-out and Fan-in

Ops can have multiple outputs consumed by different downstream ops (fan-out) or combine outputs from multiple upstream ops (fan-in):
from dagster import job, op

@op
def load_raw_data():
    return load_from_source()

@op
def validate(data):
    return validate_schema(data)

@op
def transform_a(data):
    return transform_for_a(data)

@op
def transform_b(data):
    return transform_for_b(data)

@op
def combine(a_data, b_data):
    return merge(a_data, b_data)

@job
def fan_out_fan_in():
    raw = load_raw_data()
    validated = validate(raw)
    
    # Fan-out: validated data goes to both transforms
    a = transform_a(validated)
    b = transform_b(validated)
    
    # Fan-in: combine takes outputs from both transforms
    combine(a, b)

Dynamic Execution

Dagster supports dynamic op generation at runtime using DynamicOut:
from dagster import job, op, DynamicOut, DynamicOutput

@op(out=DynamicOut())
def load_files():
    files = ["file1.csv", "file2.csv", "file3.csv"]
    for file in files:
        yield DynamicOutput(file, mapping_key=file.replace(".", "_"))

@op
def process_file(file_name):
    return process(file_name)

@job
def dynamic_pipeline():
    load_files().map(process_file)

Testing

Ops and jobs are easy to test:
from dagster import materialize

def test_add_one():
    # Test an op directly
    result = add_one(5)
    assert result == 6

def test_job():
    # Test a job execution
    result = do_stuff.execute_in_process()
    assert result.success
    
    final_value = result.output_for_node("add_one")
    assert final_value == 6

When to Use Ops vs Assets

  • You’re building data pipelines with persistent outputs (tables, files, models)
  • You want automatic lineage tracking and observability
  • You need cross-job dependencies
  • You’re focused on “what data exists” rather than “what tasks run”
  • You’re building imperative workflows without persistent outputs
  • You need fine-grained control over execution order
  • You’re migrating from task-based orchestrators like Airflow
  • Your computation is purely procedural (e.g., sending notifications, running commands)

Best Practices

Keep ops focused: Each op should do one thing well. Break complex operations into multiple ops that can be tested and reused independently.
Use type annotations: Add type hints to op inputs and outputs for better validation and documentation.
Leverage resources: Use resources for external services (databases, APIs) to enable testing with mocks.

API Reference

Build docs developers (and LLMs) love