Skip to main content
Durable tasks are tasks whose execution state is checkpointed by Hatchet. If the worker process crashes or is restarted mid-execution, Hatchet replays the task from the last checkpoint rather than re-running it from scratch. Use durable tasks when you need to:
  • Sleep for minutes, hours, or days without holding a worker slot the entire time
  • Wait for an external event before continuing (e.g. a webhook, user action, or approval)
  • Build long-lived workflows that span multiple processes or deployments
Durable tasks must be async functions in Python and TypeScript. Internally, Hatchet persists each await point and can replay the coroutine from that checkpoint.

Define a durable task

Use @hatchet.durable_task() instead of @hatchet.task(). The second argument to the function must be DurableContext rather than Context.
worker.py
from hatchet_sdk import DurableContext, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

@hatchet.durable_task()
async def simple_durable(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:
    return {"result": "Hello, world!"}
You can also add durable tasks to a workflow object:
worker.py
from datetime import timedelta
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

durable_workflow = hatchet.workflow(name="DurableWorkflow")

@durable_workflow.task()
async def ephemeral_task(input: EmptyModel, ctx: Context) -> None:
    print("Running non-durable task")

@durable_workflow.durable_task()
async def durable_task(input: EmptyModel, ctx: DurableContext) -> dict:
    # durable_task checkpoints are persisted
    return {"status": "done"}

Sleep for a duration

ctx.aio_sleep_for() (Python) / ctx.sleepFor() (TypeScript) / ctx.SleepFor() (Go) pauses execution for a fixed duration. The worker slot is released during the sleep — the task resumes on any available worker when the timer fires.
worker.py
import time
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import DurableContext, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

EVENT_KEY = "durable-example:event"
SLEEP_TIME = 5

class AwaitedEvent(BaseModel):
    id: str

durable_workflow = hatchet.workflow(name="DurableWorkflow")

@durable_workflow.durable_task()
async def durable_task(
    input: EmptyModel, ctx: DurableContext
) -> dict[str, str | int]:
    print("Waiting for sleep")
    sleep = await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))
    print("Sleep finished")

    return {
        "status": "success",
        "sleep_duration_seconds": sleep.duration.seconds,
    }

Wait for an external event

ctx.aio_wait_for_event() (Python) / ctx.waitForEvent() (TypeScript) / ctx.WaitForEvent() (Go) suspends the task until Hatchet receives a matching event pushed via hatchet.event.push().
worker.py
from pydantic import BaseModel
from hatchet_sdk import DurableContext, EmptyModel, Hatchet
from datetime import timedelta

hatchet = Hatchet(debug=True)

EVENT_KEY = "durable-example:event"

class AwaitedEvent(BaseModel):
    id: str

durable_workflow = hatchet.workflow(name="DurableWorkflow")

@durable_workflow.durable_task()
async def durable_task(
    input: EmptyModel, ctx: DurableContext
) -> dict[str, str | int]:
    # Wait for a matching event
    event = await ctx.aio_wait_for_event(
        EVENT_KEY, "true", payload_validator=AwaitedEvent
    )
    print("Event received")

    return {
        "status": "success",
        "event_id": event.id,
    }
Push the event from your application to unblock the waiting task:
trigger.py
import time
from worker import EVENT_KEY, SLEEP_TIME, durable_workflow, hatchet
from pydantic import BaseModel

class AwaitedEvent(BaseModel):
    id: str

durable_workflow.run_no_wait()

print("Sleeping")
time.sleep(SLEEP_TIME + 2)

print("Pushing event")
hatchet.event.push(EVENT_KEY, AwaitedEvent(id="123").model_dump(mode="json"))

Combining conditions with or_ / Or

Use ctx.aio_wait_for() (Python) / ctx.waitFor() (TypeScript) with an or_() combinator to resume when the first of several conditions is met.
worker.py
from uuid import uuid4
from datetime import timedelta
from hatchet_sdk import (
    DurableContext, EmptyModel, Hatchet,
    SleepCondition, UserEventCondition, or_,
)

hatchet = Hatchet(debug=True)
EVENT_KEY = "durable-example:event"
SLEEP_TIME = 5

durable_workflow = hatchet.workflow(name="DurableWorkflow")

@durable_workflow.durable_task()
async def wait_for_or_group_1(
    _i: EmptyModel, ctx: DurableContext
) -> dict[str, str | int | float]:
    wait_result = await ctx.aio_wait_for(
        uuid4().hex,
        or_(
            SleepCondition(timedelta(seconds=SLEEP_TIME)),
            UserEventCondition(event_key=EVENT_KEY),
        ),
    )
    return {"key": list(wait_result.keys())[0]}

DAG-level wait and skip conditions

For tasks inside a DAG workflow you can use wait_for and skip_if on @workflow.task() declarations (no DurableContext required). These are evaluated by Hatchet before dispatching the task.
worker.py
from datetime import timedelta
from hatchet_sdk import (
    Context, EmptyModel, Hatchet,
    ParentCondition, SleepCondition, UserEventCondition, or_,
)
import random
from pydantic import BaseModel

hatchet = Hatchet(debug=True)

class StepOutput(BaseModel):
    random_number: int

task_condition_workflow = hatchet.workflow(name="TaskConditionWorkflow")

@task_condition_workflow.task()
def start(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Wait for a timer OR an event before running
@task_condition_workflow.task(
    parents=[start],
    wait_for=[
        or_(
            SleepCondition(duration=timedelta(minutes=1)),
            UserEventCondition(event_key="wait_for_event:start"),
        )
    ],
)
def wait_for_event(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

# Skip this task if start emitted a number > 0
@task_condition_workflow.task(
    parents=[start],
    wait_for=[SleepCondition(timedelta(seconds=30))],
    skip_if=[UserEventCondition(event_key="skip_on_event:skip")],
)
def skip_on_event(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))

Register durable tasks on a worker

Durable tasks require dedicated durable slots on the worker. Use durable_slots (Python) or WithDurableSlots (Go).
worker.py
from hatchet_sdk import Hatchet
from worker import durable_workflow, simple_durable

hatchet = Hatchet()

def main() -> None:
    worker = hatchet.worker(
        "durable-worker",
        workflows=[durable_workflow, simple_durable],
    )
    worker.start()

if __name__ == "__main__":
    main()

Condition reference

ConditionPythonTypeScriptDescription
Sleep timerSleepCondition(timedelta(seconds=N))new SleepCondition('Ns')Resumes after a fixed duration.
User eventUserEventCondition(event_key="key")new UserEventCondition('key')Resumes when an event with the given key is pushed.
Parent outputParentCondition(parent=fn, expression="...")Evaluates a CEL expression against a parent task’s output.
Or combinatoror_(cond_a, cond_b)Or(condA, condB)Resumes when the first condition is met.

Next steps

Child workflows

Spawn sub-workflows from inside a durable task.

DAG workflows

Build task graphs with explicit dependencies.

Build docs developers (and LLMs) love