Durable tasks are tasks whose execution state is checkpointed by Hatchet. If the worker process crashes or is restarted mid-execution, Hatchet replays the task from the last checkpoint rather than re-running it from scratch.
Use durable tasks when you need to:
Sleep for minutes, hours, or days without holding a worker slot the entire time
Wait for an external event before continuing (e.g. a webhook, user action, or approval)
Build long-lived workflows that span multiple processes or deployments
Durable tasks must be async functions in Python and TypeScript. Internally, Hatchet persists each await point and can replay the coroutine from that checkpoint.
Define a durable task
Use @hatchet.durable_task() instead of @hatchet.task(). The second argument to the function must be DurableContext rather than Context. from hatchet_sdk import DurableContext, EmptyModel, Hatchet
hatchet = Hatchet( debug = True )
@hatchet.durable_task ()
async def simple_durable ( input : EmptyModel, ctx : DurableContext) -> dict[ str , str ]:
return { "result" : "Hello, world!" }
You can also add durable tasks to a workflow object: from datetime import timedelta
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet
hatchet = Hatchet( debug = True )
durable_workflow = hatchet.workflow( name = "DurableWorkflow" )
@durable_workflow.task ()
async def ephemeral_task ( input : EmptyModel, ctx : Context) -> None :
print ( "Running non-durable task" )
@durable_workflow.durable_task ()
async def durable_task ( input : EmptyModel, ctx : DurableContext) -> dict :
# durable_task checkpoints are persisted
return { "status" : "done" }
Call hatchet.durableTask() or workflow.durableTask(). The fn receives a DurableContext as its second argument. import { hatchet } from './hatchet-client' ;
export const durableWorkflow = hatchet . workflow ({
name: 'durable-workflow' ,
});
durableWorkflow . durableTask ({
name: 'durable_task' ,
executionTimeout: '10m' ,
fn : async ( _input , ctx ) => {
return { status: 'done' };
},
});
Standalone durable tasks use hatchet.durableTask() directly: export const waitForSleepTwice = hatchet . durableTask ({
name: 'wait-for-sleep-twice' ,
executionTimeout: '10m' ,
fn : async ( _input , ctx ) => {
// ctx has sleepFor, waitForEvent, waitFor, now
return { runtime: 0 };
},
});
Use client.NewStandaloneDurableTask() instead of NewStandaloneTask(). The handler receives a hatchet.DurableContext. package main
import (
" log "
" time "
" github.com/hatchet-dev/hatchet/pkg/cmdutils "
hatchet " github.com/hatchet-dev/hatchet/sdks/go "
)
type DurableInput struct {
Message string `json:"message"`
Delay int `json:"delay"`
}
type DurableOutput struct {
ProcessedAt string `json:"processed_at"`
Message string `json:"message"`
}
func main () {
client , err := hatchet . NewClient ()
if err != nil {
log . Fatalf ( "failed to create hatchet client: %v " , err )
}
task := client . NewStandaloneDurableTask (
"long-running-task" ,
func ( ctx hatchet . DurableContext , input DurableInput ) ( DurableOutput , error ) {
// ctx.SleepFor, ctx.WaitForEvent, etc. are available here
return DurableOutput {
ProcessedAt : time . Now (). Format ( time . RFC3339 ),
Message : "Processed: " + input . Message ,
}, nil
},
)
_ = task
}
Sleep for a duration
ctx.aio_sleep_for() (Python) / ctx.sleepFor() (TypeScript) / ctx.SleepFor() (Go) pauses execution for a fixed duration. The worker slot is released during the sleep — the task resumes on any available worker when the timer fires.
import time
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import DurableContext, EmptyModel, Hatchet
hatchet = Hatchet( debug = True )
EVENT_KEY = "durable-example:event"
SLEEP_TIME = 5
class AwaitedEvent ( BaseModel ):
id : str
durable_workflow = hatchet.workflow( name = "DurableWorkflow" )
@durable_workflow.durable_task ()
async def durable_task (
input : EmptyModel, ctx : DurableContext
) -> dict[ str , str | int ]:
print ( "Waiting for sleep" )
sleep = await ctx.aio_sleep_for( duration = timedelta( seconds = SLEEP_TIME ))
print ( "Sleep finished" )
return {
"status" : "success" ,
"sleep_duration_seconds" : sleep.duration.seconds,
}
Pass a duration string such as "5s", "2m", or "1h". import { hatchet } from './hatchet-client' ;
export const SLEEP_TIME = '2s' ;
export const durableWorkflow = hatchet . workflow ({
name: 'durable-workflow' ,
});
durableWorkflow . durableTask ({
name: 'durable_task' ,
executionTimeout: '10m' ,
fn : async ( _input , ctx ) => {
console . log ( 'Waiting for sleep' );
const sleepResult = await ctx . sleepFor ( SLEEP_TIME );
console . log ( 'Sleep finished' );
return {
status: 'success' ,
sleep_duration_ms: sleepResult . durationMs ,
};
},
});
package main
import (
" log "
" time "
hatchet " github.com/hatchet-dev/hatchet/sdks/go "
)
task := client . NewStandaloneDurableTask (
"long-running-task" ,
func ( ctx hatchet . DurableContext , input DurableInput ) ( DurableOutput , error ) {
log . Printf ( "Starting task, will sleep for %d seconds" , input . Delay )
if _ , err := ctx . SleepFor (
time . Duration ( input . Delay ) * time . Second ,
); err != nil {
return DurableOutput {}, err
}
log . Printf ( "Finished sleeping, processing message: %s " , input . Message )
return DurableOutput {
ProcessedAt : time . Now (). Format ( time . RFC3339 ),
Message : "Processed: " + input . Message ,
}, nil
},
)
Wait for an external event
ctx.aio_wait_for_event() (Python) / ctx.waitForEvent() (TypeScript) / ctx.WaitForEvent() (Go) suspends the task until Hatchet receives a matching event pushed via hatchet.event.push().
from pydantic import BaseModel
from hatchet_sdk import DurableContext, EmptyModel, Hatchet
from datetime import timedelta
hatchet = Hatchet( debug = True )
EVENT_KEY = "durable-example:event"
class AwaitedEvent ( BaseModel ):
id : str
durable_workflow = hatchet.workflow( name = "DurableWorkflow" )
@durable_workflow.durable_task ()
async def durable_task (
input : EmptyModel, ctx : DurableContext
) -> dict[ str , str | int ]:
# Wait for a matching event
event = await ctx.aio_wait_for_event(
EVENT_KEY , "true" , payload_validator = AwaitedEvent
)
print ( "Event received" )
return {
"status" : "success" ,
"event_id" : event.id,
}
Push the event from your application to unblock the waiting task: import time
from worker import EVENT_KEY , SLEEP_TIME , durable_workflow, hatchet
from pydantic import BaseModel
class AwaitedEvent ( BaseModel ):
id : str
durable_workflow.run_no_wait()
print ( "Sleeping" )
time.sleep( SLEEP_TIME + 2 )
print ( "Pushing event" )
hatchet.event.push( EVENT_KEY , AwaitedEvent( id = "123" ).model_dump( mode = "json" ))
durableWorkflow . durableTask ({
name: 'durable_task' ,
executionTimeout: '10m' ,
fn : async ( _input , ctx ) => {
console . log ( 'Waiting for event' );
const event = await ctx . waitForEvent ( EVENT_KEY , 'true' );
console . log ( 'Event received' );
return {
status: 'success' ,
event: event ,
};
},
});
Pass an optional CEL filter expression as the second argument. Pass an empty string to match any payload. task := client . NewStandaloneDurableTask (
"long-running-task" ,
func ( ctx hatchet . DurableContext , input DurableInput ) ( DurableOutput , error ) {
// Wait for any payload on this event key
if _ , err := ctx . WaitForEvent ( "user:updated" , "" ); err != nil {
return DurableOutput {}, err
}
return DurableOutput {
ProcessedAt : time . Now (). Format ( time . RFC3339 ),
Message : "Processed: " + input . Message ,
}, nil
},
)
Add a CEL filter to only resume when the event payload matches: // Only resume when status_code == 200
if _ , err := ctx . WaitForEvent (
"user:updated" ,
"input.status_code == 200" ,
); err != nil {
return DurableOutput {}, err
}
Combining conditions with or_ / Or
Use ctx.aio_wait_for() (Python) / ctx.waitFor() (TypeScript) with an or_() combinator to resume when the first of several conditions is met.
from uuid import uuid4
from datetime import timedelta
from hatchet_sdk import (
DurableContext, EmptyModel, Hatchet,
SleepCondition, UserEventCondition, or_,
)
hatchet = Hatchet( debug = True )
EVENT_KEY = "durable-example:event"
SLEEP_TIME = 5
durable_workflow = hatchet.workflow( name = "DurableWorkflow" )
@durable_workflow.durable_task ()
async def wait_for_or_group_1 (
_i : EmptyModel, ctx : DurableContext
) -> dict[ str , str | int | float ]:
wait_result = await ctx.aio_wait_for(
uuid4().hex,
or_(
SleepCondition(timedelta( seconds = SLEEP_TIME )),
UserEventCondition( event_key = EVENT_KEY ),
),
)
return { "key" : list (wait_result.keys())[ 0 ]}
import { Or , SleepCondition , UserEventCondition } from '@hatchet/v1/conditions' ;
durableWorkflow . durableTask ({
name: 'wait_for_or_group_1' ,
executionTimeout: '10m' ,
fn : async ( _input , ctx ) => {
const waitResult = await ctx . waitFor (
Or (
new SleepCondition ( '2s' , 'sleep' ),
new UserEventCondition ( EVENT_KEY , '' , 'event' ),
)
);
return { result: waitResult };
},
});
DAG-level wait and skip conditions
For tasks inside a DAG workflow you can use wait_for and skip_if on @workflow.task() declarations (no DurableContext required). These are evaluated by Hatchet before dispatching the task.
from datetime import timedelta
from hatchet_sdk import (
Context, EmptyModel, Hatchet,
ParentCondition, SleepCondition, UserEventCondition, or_,
)
import random
from pydantic import BaseModel
hatchet = Hatchet( debug = True )
class StepOutput ( BaseModel ):
random_number: int
task_condition_workflow = hatchet.workflow( name = "TaskConditionWorkflow" )
@task_condition_workflow.task ()
def start ( input : EmptyModel, ctx : Context) -> StepOutput:
return StepOutput( random_number = random.randint( 1 , 100 ))
# Wait for a timer OR an event before running
@task_condition_workflow.task (
parents = [start],
wait_for = [
or_(
SleepCondition( duration = timedelta( minutes = 1 )),
UserEventCondition( event_key = "wait_for_event:start" ),
)
],
)
def wait_for_event ( input : EmptyModel, ctx : Context) -> StepOutput:
return StepOutput( random_number = random.randint( 1 , 100 ))
# Skip this task if start emitted a number > 0
@task_condition_workflow.task (
parents = [start],
wait_for = [SleepCondition(timedelta( seconds = 30 ))],
skip_if = [UserEventCondition( event_key = "skip_on_event:skip" )],
)
def skip_on_event ( input : EmptyModel, ctx : Context) -> StepOutput:
return StepOutput( random_number = random.randint( 1 , 100 ))
import { Or } from '@hatchet/v1/conditions' ;
import { hatchet } from './hatchet-client' ;
export const dagWithConditions = hatchet . workflow ({ name: 'simple' });
const firstTask = dagWithConditions . task ({
name: 'first-task' ,
fn : async () => ({ Completed: true }),
});
dagWithConditions . task ({
name: 'second-task' ,
parents: [ firstTask ],
waitFor: Or ({ eventKey: 'user:event' }, { sleepFor: '10s' }),
fn : async ( _ , ctx ) => {
console . log ( 'triggered by condition' , ctx . triggers ());
return { Completed: true };
},
});
Register durable tasks on a worker
Durable tasks require dedicated durable slots on the worker. Use durable_slots (Python) or WithDurableSlots (Go).
from hatchet_sdk import Hatchet
from worker import durable_workflow, simple_durable
hatchet = Hatchet()
def main () -> None :
worker = hatchet.worker(
"durable-worker" ,
workflows = [durable_workflow, simple_durable],
)
worker.start()
if __name__ == "__main__" :
main()
import { hatchet } from './hatchet-client' ;
import { durableWorkflow } from './workflow' ;
async function main () {
const worker = await hatchet . worker ( 'durable-worker' , {
workflows: [ durableWorkflow ],
});
await worker . start ();
}
if ( require . main === module ) { main (); }
worker , err := client . NewWorker (
"durable-worker" ,
hatchet . WithWorkflows ( task ),
hatchet . WithDurableSlots ( 10 ),
)
if err != nil {
log . Fatalf ( "failed to create worker: %v " , err )
}
interruptCtx , cancel := cmdutils . NewInterruptContext ()
defer cancel ()
if err := worker . StartBlocking ( interruptCtx ); err != nil {
log . Fatalf ( "failed to start worker: %v " , err )
}
Condition reference
Condition Python TypeScript Description Sleep timer SleepCondition(timedelta(seconds=N))new SleepCondition('Ns')Resumes after a fixed duration. User event UserEventCondition(event_key="key")new UserEventCondition('key')Resumes when an event with the given key is pushed. Parent output ParentCondition(parent=fn, expression="...")— Evaluates a CEL expression against a parent task’s output. Or combinator or_(cond_a, cond_b)Or(condA, condB)Resumes when the first condition is met.
Next steps
Child workflows Spawn sub-workflows from inside a durable task.
DAG workflows Build task graphs with explicit dependencies.