A task is the basic unit of work in Hatchet. It is a function that Hatchet executes reliably — with retries, timeouts, concurrency limits, and full observability built in. Tasks are registered on a worker process and triggered from your application code.
Define a task
Use the @hatchet.task() decorator to turn any function into a Hatchet task. The function receives a typed input model and a Context object. from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet()
@hatchet.task ()
def simple ( input : EmptyModel, ctx : Context) -> dict[ str , str ]:
return { "result" : "Hello, world!" }
To accept structured input, pass a Pydantic model as input_validator: from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet
hatchet = Hatchet()
class SimpleInput ( BaseModel ):
message: str
class SimpleOutput ( BaseModel ):
transformed_message: str
@hatchet.task ( name = "first-task" , input_validator = SimpleInput)
def first_task ( input : SimpleInput, ctx : Context) -> SimpleOutput:
return SimpleOutput( transformed_message = input .message.lower())
Call hatchet.task() with a name and an fn implementation. Input and output types are inferred from the function signature. import { hatchet } from './hatchet-client' ;
export type SimpleInput = {
Message : string ;
};
export const simple = hatchet . task ({
name: 'simple' ,
retries: 3 ,
fn : async ( input : SimpleInput ) => {
return {
TransformedMessage: input . Message . toLowerCase (),
};
},
});
Call client.NewStandaloneTask() with a name and a handler function. Input and output are plain structs. package main
import (
" log "
hatchet " github.com/hatchet-dev/hatchet/sdks/go "
)
type SimpleInput struct {
Message string `json:"message"`
}
type SimpleOutput struct {
Result string `json:"result"`
}
func main () {
client , err := hatchet . NewClient ()
if err != nil {
log . Fatalf ( "failed to create hatchet client: %v " , err )
}
task := client . NewStandaloneTask (
"process-message" ,
func ( ctx hatchet . Context , input SimpleInput ) ( SimpleOutput , error ) {
return SimpleOutput {
Result : "Processed: " + input . Message ,
}, nil
},
)
_ = task
}
Task options
These parameters are available on @hatchet.task() (Python), hatchet.task() (TypeScript), and NewStandaloneTask() / NewTask() options (Go).
The name of the task. In Python, defaults to the decorated function’s name when omitted. In TypeScript and Go, required.
Number of times to retry the task after a failure before marking it as failed.
Maximum wall-clock time the task function may run. In Python, accepts a timedelta. In TypeScript, a duration string such as "5m". The task is cancelled if it exceeds this limit.
Maximum time Hatchet will wait for a worker slot before marking the task as timed out before it even starts.
concurrency
int | ConcurrencyExpression
Limits how many runs of this task may execute simultaneously. Pass an integer for a simple cap, or a ConcurrencyExpression to group runs by a CEL expression (e.g. input.user_id).
A list of rate limit configurations that throttle how often the task may be dispatched.
Multiplier applied to the retry delay for exponential backoff. Used together with backoff_max_seconds.
Maximum number of seconds between retries when exponential backoff is enabled.
Register on a worker and start
A worker is a long-running process that connects to Hatchet and pulls work from the queue. Register your tasks when creating the worker.
from hatchet_sdk import Hatchet
from worker import simple
hatchet = Hatchet()
def main () -> None :
worker = hatchet.worker(
"test-worker" ,
workflows = [simple],
)
worker.start()
if __name__ == "__main__" :
main()
import { hatchet } from './hatchet-client' ;
import { simple } from './workflow' ;
async function main () {
const worker = await hatchet . worker ( 'simple-worker' , {
workflows: [ simple ],
slots: 100 ,
});
await worker . start ();
}
if ( require . main === module ) {
main ();
}
import (
" log "
" github.com/hatchet-dev/hatchet/pkg/cmdutils "
hatchet " github.com/hatchet-dev/hatchet/sdks/go "
)
worker , err := client . NewWorker ( "simple-worker" , hatchet . WithWorkflows ( task ))
if err != nil {
log . Fatalf ( "failed to create worker: %v " , err )
}
interruptCtx , cancel := cmdutils . NewInterruptContext ()
defer cancel ()
err = worker . StartBlocking ( interruptCtx )
if err != nil {
log . Fatalf ( "failed to start worker: %v " , err )
}
Start the worker in a separate terminal or as a background process. It must be running before you trigger tasks.
Trigger a task
Once your worker is running, call .run() (blocking) or .aio_run() (async) from any part of your application to trigger the task and wait for the result.
import asyncio
from worker import first_task
from pydantic import BaseModel
class SimpleInput ( BaseModel ):
message: str
async def main () -> None :
result = await first_task.aio_run(SimpleInput( message = "Hello, World!" ))
print (result[ "transformed_message" ])
if __name__ == "__main__" :
asyncio.run(main())
Use .run() for synchronous callers or .run_no_wait() to fire and forget: # Fire and forget — returns immediately without waiting for the result
simple.run_no_wait()
import { simple } from './workflow' ;
async function main () {
const res = await simple . run ({
Message: 'Hello, World!' ,
});
console . log ( res . TransformedMessage );
}
main (). catch ( console . error ). finally (() => process . exit ( 0 ));
import (
" context "
" fmt "
)
result , err := task . Run ( context . Background (), SimpleInput { Message : "Hello, World!" })
if err != nil {
log . Fatalf ( "task failed: %v " , err )
}
var output SimpleOutput
err = result . Into ( & output )
if err != nil {
log . Fatalf ( "failed to parse output: %v " , err )
}
fmt . Println ( output . Result )
To trigger without blocking, use RunNoWait and subscribe later: runRef , err := task . RunNoWait ( context . Background (), SimpleInput { Message : "Hello, World!" })
if err != nil {
log . Fatalf ( "failed to dispatch task: %v " , err )
}
fmt . Println ( runRef . RunId )
// Subscribe to the result later
result , err := runRef . Result ()
if err != nil {
log . Fatalf ( "task failed: %v " , err )
}
var output SimpleOutput
err = result . TaskOutput ( "process-message" ). Into ( & output )
Running multiple tasks in parallel
You can fan out multiple task runs concurrently using standard async primitives.
import asyncio
from worker import child_task, SimpleInput
result1 = child_task.aio_run(SimpleInput( message = "Hello, World!" ))
result2 = child_task.aio_run(SimpleInput( message = "Hello, Moon!" ))
results = await asyncio.gather(result1, result2)
print (results[ 0 ][ "transformed_message" ])
print (results[ 1 ][ "transformed_message" ])
Or use aio_run_many to submit a batch in one call: greetings = [ "Hello, World!" , "Hello, Moon!" , "Hello, Mars!" ]
results = await child_task.aio_run_many(
[
child_task.create_bulk_run_item(
input = SimpleInput( message = greeting),
)
for greeting in greetings
]
)
print (results)
import { simple } from './workflow' ;
const res1 = simple . run ({ Message: 'HeLlO WoRlD' });
const res2 = simple . run ({ Message: 'Hello MoOn' });
const results = await Promise . all ([ res1 , res2 ]);
console . log ( results [ 0 ]. TransformedMessage );
console . log ( results [ 1 ]. TransformedMessage );
import " sync "
var results [] string
var mu sync . Mutex
var wg sync . WaitGroup
wg . Add ( 2 )
go func () {
defer wg . Done ()
result , err := task . Run ( context . Background (), SimpleInput { Message : "Hello, World!" })
if err != nil {
return
}
var out SimpleOutput
result . Into ( & out )
mu . Lock ()
results = append ( results , out . Result )
mu . Unlock ()
}()
go func () {
defer wg . Done ()
result , err := task . Run ( context . Background (), SimpleInput { Message : "Hello, Moon!" })
if err != nil {
return
}
var out SimpleOutput
result . Into ( & out )
mu . Lock ()
results = append ( results , out . Result )
mu . Unlock ()
}()
wg . Wait ()
Next steps
DAG workflows Chain tasks together with explicit dependencies.
Durable execution Build tasks that sleep, wait for events, and survive restarts.
Child workflows Spawn sub-workflows from inside a running task.
Flow control Add concurrency limits and rate limits.