Skip to main content
Hatchet is built around a small set of composable primitives. Understanding them will help you design reliable, observable background workloads.

Tasks

A task is a single unit of work — a function registered with Hatchet that can be enqueued, executed, retried, and observed. Every task execution is persisted to Postgres. A task has:
  • Typed input — validated via Pydantic (BaseModel) in Python, a plain type in TypeScript/Go.
  • Typed output — the return value of the function, also persisted.
  • Retries — configurable retry count on failure.
  • Timeoutsexecution_timeout limits how long the task can run; schedule_timeout limits how long it can wait in the queue before being picked up.
  • Priority — integer (1–3) controlling relative ordering within a queue.
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet()

class SimpleInput(BaseModel):
    message: str

@hatchet.task(name="first-task", input_validator=SimpleInput)
def first_task(input: SimpleInput, ctx: Context) -> dict:
    return {"transformed_message": input.message.lower()}
Use input_validator to get typed access to your input and automatic validation errors before your function body runs.

Workflows

A workflow is a named collection of tasks that can be triggered together. When you use @hatchet.task(...) directly, Hatchet creates a standalone single-task workflow automatically. When you need multiple tasks to run in sequence or in parallel, you define a multi-task workflow explicitly. Workflows can be triggered:
  • Manually — by calling .run() or .aio_run() from your application code.
  • By event — via on_events in the workflow config, when a matching event key is published.
  • On a schedule — via on_crons or by calling .schedule() for a one-time future run.
from hatchet_sdk import Hatchet, Context, EmptyModel

hatchet = Hatchet()

# A workflow is a collection of tasks
simple = hatchet.workflow(name="SimpleWorkflow")

@simple.task()
def task_1(input: EmptyModel, ctx: Context) -> dict:
    return {"result": "task_1"}

@simple.task(parents=[task_1])
def task_2(input: EmptyModel, ctx: Context) -> None:
    first_result = ctx.task_output(task_1)
    print(first_result)

Workers

A worker is a long-running process that connects to the Hatchet server, registers the workflows it can handle, and polls for tasks to execute. Workers are your application code — Hatchet dispatches tasks to them over a persistent connection. Key worker properties:
  • Slots — the number of concurrent task runs the worker will accept at once. A worker with slots=10 can execute up to 10 tasks in parallel.
  • Workflow registration — a worker only executes the workflows it was started with. Tasks for unregistered workflows are routed to other workers.
worker = hatchet.worker(
    "first-worker",
    slots=10,
    workflows=[first_task],
)
worker.start()
You can run multiple worker processes — even on different machines — all registered to the same workflow. Hatchet will distribute tasks across them.

DAGs

A DAG (directed acyclic graph) is a workflow where tasks have explicit parent-child dependencies. When a parent task completes, its output is automatically available to child tasks via ctx.task_output(). Tasks with no parents run immediately; tasks with parents wait until all parents have succeeded.
from hatchet_sdk import Hatchet, Context, EmptyModel

hatchet = Hatchet()
dag = hatchet.workflow(name="SimpleDAG")

@dag.task()
def step_1(input: EmptyModel, ctx: Context) -> dict:
    return {"value": 42}

@dag.task(parents=[step_1])
def step_2(input: EmptyModel, ctx: Context) -> dict:
    result = ctx.task_output(step_1)
    return {"doubled": result["value"] * 2}
In TypeScript:
const dag = hatchet.workflow<DagInput, DagOutput>({ name: 'simple-dag' });

const task1 = dag.task({
  name: 'task-1',
  fn: (input) => ({ result: 'task-1' }),
});

const task2 = dag.task({
  name: 'task-2',
  parents: [task1],
  fn: (input, ctx) => {
    const firstResult = await ctx.parentOutput(task1);
    console.log(firstResult);
  },
});
DAGs are ideal when you have a fixed, pre-defined pipeline shape. For dynamic fan-out (spawning an unknown number of tasks at runtime), see durable execution.

Durable execution

Durable tasks are a special task type (@hatchet.durable_task()) designed to orchestrate long-running or event-driven work. They store a full history of all spawned sub-tasks, which means:
  • If your worker crashes mid-execution, the task resumes from the last checkpoint rather than restarting from scratch.
  • A durable task can sleep for a duration without holding a worker slot.
  • A durable task can wait for an external event before continuing.
from hatchet_sdk import DurableContext, EmptyModel, Hatchet

hatchet = Hatchet()

@hatchet.durable_task()
async def simple_durable(input: EmptyModel, ctx: DurableContext) -> dict:
    # Durable tasks should be async
    return {"result": "Hello, world!"}
You can also add conditional triggers to tasks inside a DAG — for example, wait for a user event or a sleep timer, whichever fires first:
from datetime import timedelta
from hatchet_sdk.conditions import SleepCondition, UserEventCondition, or_

@dag.task(
    parents=[first_task],
    wait_for=[
        or_(
            SleepCondition(timedelta(seconds=10)),
            UserEventCondition(event_key="user:event"),
        )
    ]
)
def second_task(input: EmptyModel, ctx: Context) -> dict:
    return {"completed": "true"}

Flow control

Hatchet provides two primitives for protecting your system from overload.

Concurrency limits

Set a maximum number of concurrent runs for a workflow, optionally scoped to a dynamic key (such as a user ID) using a CEL expression.
from hatchet_sdk.runnables.types import ConcurrencyExpression, ConcurrencyLimitStrategy

flow_control_workflow = hatchet.workflow(
    name="FlowControlWorkflow",
    concurrency=ConcurrencyExpression(
        expression="input.user_id",
        max_runs=5,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
    ),
)
Available limit strategies:
  • GROUP_ROUND_ROBIN — distribute slots evenly across concurrency groups.
  • CANCEL_IN_PROGRESS — cancel the currently running task when the limit is hit.
  • CANCEL_NEWEST — reject the incoming task when the limit is hit.

Rate limits

Rate limits cap how many task executions are allowed in a time window. Like concurrency, they support dynamic keys so you can apply per-user or per-tenant limits:
from hatchet_sdk.rate_limit import RateLimit, RateLimitDuration

@flow_control_workflow.task(
    rate_limits=[
        RateLimit(
            dynamic_key="input.user_id",
            units=1,
            limit=10,
            duration=RateLimitDuration.MINUTE,
        )
    ]
)
def rate_limited_task(input, ctx) -> None:
    print("executed rate_limited_task")
Rate limits and concurrency limits operate at the Hatchet server level, not inside your worker process. They are enforced before a task is dispatched to a worker.

Scheduling

Hatchet supports three scheduling mechanisms:

Cron schedules

Run a task on a repeating cron schedule:
cron = simple.create_cron(
    cron_name="every-day",
    expression="0 0 * * *",
    input=SimpleInput(message="Hello, World!"),
)

One-time scheduled runs

Schedule a task to run at a specific future time:
from datetime import datetime, timedelta

tomorrow = datetime.today() + timedelta(days=1)
scheduled = simple.schedule(
    tomorrow,
    SimpleInput(message="Hello, World!")
)

Event-triggered runs

Configure a workflow to start automatically when a matching event key is published:
workflow = hatchet.workflow(
    name="OnEventWorkflow",
    on_events=["user:created"],
)
Durable sleep (ctx.aio_sleep_for(...)) lets a running task pause for a duration without holding a worker slot, which is more efficient than blocking the thread.

All concepts at a glance

Tasks

Functions registered with Hatchet, with typed inputs/outputs, retries, and timeouts.

DAGs

Multi-step workflows with parent-child dependencies and automatic output routing.

Durable execution

Long-running tasks that sleep, wait for events, and resume after crashes.

Workers

Processes that register workflows and execute tasks dispatched by the Hatchet server.

Flow control

Concurrency limits and rate limits scoped globally or per user/tenant.

Scheduling

Cron, one-time, and event-based triggers for your workflows.

Build docs developers (and LLMs) love