Skip to main content
A DAG workflow is a collection of tasks where some tasks depend on the output of others. Hatchet executes tasks in topological order — tasks with no dependencies run immediately, and downstream tasks run only after all their parents have completed successfully. DAGs let you express complex processing pipelines declaratively: you describe what depends on what, and Hatchet handles the scheduling.

Define a workflow and add tasks

Call hatchet.workflow() to create a workflow object, then decorate functions with @workflow.task(). Declare dependencies with the parents argument.
worker.py
import random
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

class StepOutput(BaseModel):
    random_number: int

class RandomSum(BaseModel):
    sum: int

dag_workflow = hatchet.workflow(name="DAGWorkflow")

@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

Add tasks with parent dependencies

Pass the parent task functions in the parents list. Inside the child task, call ctx.task_output(parent_fn) to get the typed output of a parent.
worker.py
@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(step1).random_number
    two = ctx.task_output(step2).random_number

    return RandomSum(sum=one + two)

@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
    print(
        ctx.task_output(step1),
        ctx.task_output(step3),
    )
    return {"step4": "step4"}

Full example: multi-step DAG

The following example builds a four-step DAG. Steps 2 and 3 both depend on step 1 (they run in parallel), and the final step depends on both.
worker.py
import random
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

class StepOutput(BaseModel):
    random_number: int

class RandomSum(BaseModel):
    sum: int

dag_workflow = hatchet.workflow(name="DAGWorkflow")

@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(step1).random_number
    two = ctx.task_output(step2).random_number
    return RandomSum(sum=one + two)

@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
    print(ctx.task_output(step1), ctx.task_output(step3))
    return {"step4": "step4"}

def main() -> None:
    worker = hatchet.worker("dag-worker", workflows=[dag_workflow])
    worker.start()

if __name__ == "__main__":
    main()

Trigger a DAG workflow

trigger.py
from worker import dag_workflow

dag_workflow.run()

Conditional task execution

Use skip_if (Python) or skipIf (TypeScript) to conditionally skip a task based on a parent’s output or an external event. A skipped task is not considered a failure.
ParentCondition evaluates a CEL expression against the named parent’s output. If the expression is truthy, the task is skipped.
worker.py
from hatchet_sdk import (
    Context, EmptyModel, Hatchet, ParentCondition, SleepCondition, UserEventCondition, or_
)
from pydantic import BaseModel
import random
from datetime import timedelta

hatchet = Hatchet(debug=True)

class StepOutput(BaseModel):
    random_number: int

task_condition_workflow = hatchet.workflow(name="TaskConditionWorkflow")

@task_condition_workflow.task()
def start(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Wait for a sleep condition before running
@task_condition_workflow.task(
    parents=[start],
    wait_for=[SleepCondition(timedelta(seconds=10))],
)
def wait_for_sleep(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Skip this task if start.random_number > 0
@task_condition_workflow.task(
    parents=[start, wait_for_sleep],
    skip_if=[ParentCondition(parent=start, expression="output.random_number > 0")],
)
def skip_with_multiple_parents(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Branch left if random_number <= 50
@task_condition_workflow.task(
    parents=[wait_for_sleep],
    skip_if=[
        ParentCondition(
            parent=wait_for_sleep,
            expression="output.random_number > 50",
        )
    ],
)
def left_branch(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Branch right if random_number > 50
@task_condition_workflow.task(
    parents=[wait_for_sleep],
    skip_if=[
        ParentCondition(
            parent=wait_for_sleep,
            expression="output.random_number <= 50",
        )
    ],
)
def right_branch(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))
Check ctx.was_skipped(task_fn) before reading a potentially-skipped task’s output:
@task_condition_workflow.task(
    parents=[start, wait_for_sleep, left_branch, right_branch],
)
def sum(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(start).random_number
    two = ctx.task_output(wait_for_sleep).random_number
    three = (
        ctx.task_output(left_branch).random_number
        if not ctx.was_skipped(left_branch)
        else 0
    )
    four = (
        ctx.task_output(right_branch).random_number
        if not ctx.was_skipped(right_branch)
        else 0
    )
    return RandomSum(sum=one + two + three + four)
Use wait_for to delay a task until a condition is met (e.g. a timer or external event). Use skip_if to conditionally bypass a task entirely based on parent output.

Next steps

Durable execution

Sleep and wait for events inside a task, surviving restarts.

Child workflows

Spawn sub-workflows dynamically from within a task.

Build docs developers (and LLMs) love