Skip to main content
A task is the basic unit of work in Hatchet. It is a function that Hatchet executes reliably — with retries, timeouts, concurrency limits, and full observability built in. Tasks are registered on a worker process and triggered from your application code.

Define a task

Use the @hatchet.task() decorator to turn any function into a Hatchet task. The function receives a typed input model and a Context object.
worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet()

@hatchet.task()
def simple(input: EmptyModel, ctx: Context) -> dict[str, str]:
    return {"result": "Hello, world!"}
To accept structured input, pass a Pydantic model as input_validator:
worker.py
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet()

class SimpleInput(BaseModel):
    message: str

class SimpleOutput(BaseModel):
    transformed_message: str

@hatchet.task(name="first-task", input_validator=SimpleInput)
def first_task(input: SimpleInput, ctx: Context) -> SimpleOutput:
    return SimpleOutput(transformed_message=input.message.lower())

Task options

These parameters are available on @hatchet.task() (Python), hatchet.task() (TypeScript), and NewStandaloneTask() / NewTask() options (Go).
name
string
The name of the task. In Python, defaults to the decorated function’s name when omitted. In TypeScript and Go, required.
retries
int
default:"0"
Number of times to retry the task after a failure before marking it as failed.
execution_timeout
Duration
default:"60s"
Maximum wall-clock time the task function may run. In Python, accepts a timedelta. In TypeScript, a duration string such as "5m". The task is cancelled if it exceeds this limit.
schedule_timeout
Duration
default:"5m"
Maximum time Hatchet will wait for a worker slot before marking the task as timed out before it even starts.
concurrency
int | ConcurrencyExpression
Limits how many runs of this task may execute simultaneously. Pass an integer for a simple cap, or a ConcurrencyExpression to group runs by a CEL expression (e.g. input.user_id).
rate_limits
list[RateLimit]
A list of rate limit configurations that throttle how often the task may be dispatched.
backoff_factor
float
Multiplier applied to the retry delay for exponential backoff. Used together with backoff_max_seconds.
backoff_max_seconds
int
Maximum number of seconds between retries when exponential backoff is enabled.

Register on a worker and start

A worker is a long-running process that connects to Hatchet and pulls work from the queue. Register your tasks when creating the worker.
worker.py
from hatchet_sdk import Hatchet
from worker import simple

hatchet = Hatchet()

def main() -> None:
    worker = hatchet.worker(
        "test-worker",
        workflows=[simple],
    )
    worker.start()

if __name__ == "__main__":
    main()
Start the worker in a separate terminal or as a background process. It must be running before you trigger tasks.

Trigger a task

Once your worker is running, call .run() (blocking) or .aio_run() (async) from any part of your application to trigger the task and wait for the result.
run.py
import asyncio
from worker import first_task
from pydantic import BaseModel

class SimpleInput(BaseModel):
    message: str

async def main() -> None:
    result = await first_task.aio_run(SimpleInput(message="Hello, World!"))
    print(result["transformed_message"])

if __name__ == "__main__":
    asyncio.run(main())
Use .run() for synchronous callers or .run_no_wait() to fire and forget:
# Fire and forget — returns immediately without waiting for the result
simple.run_no_wait()

Running multiple tasks in parallel

You can fan out multiple task runs concurrently using standard async primitives.
import asyncio
from worker import child_task, SimpleInput

result1 = child_task.aio_run(SimpleInput(message="Hello, World!"))
result2 = child_task.aio_run(SimpleInput(message="Hello, Moon!"))

results = await asyncio.gather(result1, result2)

print(results[0]["transformed_message"])
print(results[1]["transformed_message"])
Or use aio_run_many to submit a batch in one call:
greetings = ["Hello, World!", "Hello, Moon!", "Hello, Mars!"]

results = await child_task.aio_run_many(
    [
        child_task.create_bulk_run_item(
            input=SimpleInput(message=greeting),
        )
        for greeting in greetings
    ]
)

print(results)

Next steps

DAG workflows

Chain tasks together with explicit dependencies.

Durable execution

Build tasks that sleep, wait for events, and survive restarts.

Child workflows

Spawn sub-workflows from inside a running task.

Flow control

Add concurrency limits and rate limits.

Build docs developers (and LLMs) love