Skip to main content

Standalone tasks

A standalone task is the simplest way to define a unit of work. Apply the @hatchet.task() decorator to a function and Hatchet wraps it in a single-task workflow.
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet()

@hatchet.task()
def hello(input: EmptyModel, ctx: Context) -> dict[str, str]:
    return {"message": "Hello, world!"}
Both sync and async functions are supported:
@hatchet.task()
async def hello_async(input: EmptyModel, ctx: Context) -> dict[str, str]:
    return {"message": "Hello, world!"}

Task decorator parameters

name
str | None
default:"function name"
The name used to register this task. Defaults to the decorated function’s __name__.
description
str | None
An optional human-readable description.
input_validator
type[BaseModel] | None
A Pydantic model class used to validate and parse the input passed to the task. When not set, defaults to EmptyModel (accepts any extra fields).
on_events
list[str] | None
A list of event keys that automatically trigger this task when pushed via hatchet.event.push().
on_crons
list[str] | None
A list of cron expressions that schedule recurring runs of this task.
sticky
StickyStrategy | None
Pin task execution to a specific worker. Values: StickyStrategy.SOFT, StickyStrategy.HARD.
default_priority
int
default:"1"
Scheduling priority for this task. Higher values are scheduled before lower ones.
concurrency
int | ConcurrencyExpression | list[ConcurrencyExpression] | None
Limit how many runs can execute concurrently. Pass an int for a constant cap with GROUP_ROUND_ROBIN strategy, or a ConcurrencyExpression for CEL-based grouping.
schedule_timeout
timedelta | str
default:"timedelta(minutes=5)"
Maximum time to wait for a worker slot before cancelling the run.
execution_timeout
timedelta | str
default:"timedelta(seconds=60)"
Maximum time allowed for the task to complete once it starts.
retries
int
default:"0"
Number of times to retry the task on failure.
rate_limits
list[RateLimit] | None
Rate limit configurations applied to this task.
backoff_factor
float | None
Multiplier controlling exponential backoff between retries.
backoff_max_seconds
int | None
Upper bound (in seconds) for exponential backoff retry delays.
default_additional_metadata
dict | None
Metadata attached to every run of this task by default.

Input validation with Pydantic

Pass a Pydantic model as input_validator to get typed, validated input:
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet()

class GreetInput(BaseModel):
    name: str

@hatchet.task(input_validator=GreetInput)
def greet(input: GreetInput, ctx: Context) -> dict[str, str]:
    return {"message": f"Hello, {input.name}!"}

Durable tasks

Durable tasks use @hatchet.durable_task() and receive a DurableContext instead of a plain Context. They must be async and support additional durability primitives such as sleeping, waiting for external events, and memoizing results.
from hatchet_sdk import DurableContext, EmptyModel, Hatchet
from datetime import timedelta

hatchet = Hatchet()

@hatchet.durable_task()
async def my_durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:
    # Durably sleep for 10 minutes — survives worker restarts
    await ctx.aio_sleep_for(timedelta(minutes=10))

    # Wait for an external event
    payload = await ctx.aio_wait_for_event("user:approved")

    return {"approved_by": payload.get("user_id")}
@hatchet.durable_task() accepts the same parameters as @hatchet.task() plus:
eviction_policy
EvictionPolicy | None
Controls when an idle durable task is evicted from a worker slot while waiting for a condition.

Workflows

A workflow groups multiple tasks and defines dependencies between them (a DAG). Create a workflow with hatchet.workflow(), then add tasks with @workflow.task().
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet()

# Define the workflow
dag_workflow = hatchet.workflow(name="DAGWorkflow")

# First task — no parents
@dag_workflow.task()
def step1(input: EmptyModel, ctx: Context) -> dict[str, int]:
    return {"value": 10}

# Second task — runs after step1
@dag_workflow.task(parents=[step1])
def step2(input: EmptyModel, ctx: Context) -> dict[str, int]:
    parent_value = ctx.task_output(step1)["value"]
    return {"doubled": parent_value * 2}

hatchet.workflow() parameters

name
str
required
The name used to register this workflow.
description
str | None
Optional human-readable description.
input_validator
type[BaseModel] | None
Pydantic model for validating workflow input. Defaults to EmptyModel.
on_events
list[str] | None
Event keys that trigger this workflow automatically.
on_crons
list[str] | None
Cron expressions for recurring execution.
sticky
StickyStrategy | None
Affinity strategy for pinning runs to a worker.
default_priority
int
default:"1"
Scheduling priority for workflow runs.
concurrency
int | ConcurrencyExpression | list[ConcurrencyExpression] | None
Concurrency limits applied at the workflow level.
task_defaults
TaskDefaults
Default schedule_timeout, execution_timeout, retries, backoff_factor, backoff_max_seconds, and priority applied to all tasks in this workflow unless overridden per task.
version
str | None
An optional version string for the workflow.

Workflow task decorator parameters

@workflow.task() accepts the following parameters from @hatchet.task(): name, schedule_timeout, execution_timeout, retries, rate_limits, backoff_factor, backoff_max_seconds, concurrency, desired_worker_labels. It does not accept the workflow-level parameters (description, input_validator, on_events, on_crons, version, sticky, default_priority, default_filters, default_additional_metadata) since those are set on the enclosing hatchet.workflow(). It also accepts:
parents
list[Task] | None
List of task objects that must complete before this task runs. Parent tasks must be defined before their dependents.
wait_for
list[Condition | OrGroup] | None
Additional conditions that must be satisfied before this task runs.
skip_if
list[Condition | OrGroup] | None
Conditions that, if met, cause this task to be skipped.
cancel_if
list[Condition | OrGroup] | None
Conditions that, if met, cause this task to be cancelled.

On-failure and on-success tasks

Use @workflow.on_failure_task() to run cleanup logic when any task fails, and @workflow.on_success_task() to run logic after all tasks succeed:
@dag_workflow.on_failure_task()
def handle_failure(input: EmptyModel, ctx: Context) -> None:
    errors = ctx.task_run_errors
    # notify, clean up, etc.

@dag_workflow.on_success_task()
def handle_success(input: EmptyModel, ctx: Context) -> None:
    pass  # all tasks completed successfully
Only one on_failure_task and one on_success_task may be registered per workflow.

Task context

Every task receives a ctx: Context (or ctx: DurableContext) as its second argument.

Reading parent task output

@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> dict[str, int]:
    one = ctx.task_output(step1).random_number
    two = ctx.task_output(step2).random_number
    return {"sum": one + two}
ctx.task_output(task) returns the typed output of a parent task. It raises ValueError if the parent was skipped or its output is not found.

Logging

ctx.log("processing item")           # string
ctx.log({"key": "value"})            # JSON-serializable mapping

Refreshing the execution timeout

from datetime import timedelta

ctx.refresh_timeout(timedelta(minutes=5))  # extend by 5 minutes
ctx.refresh_timeout("5m")                  # string form also accepted

Cancelling a run

ctx.cancel()           # synchronous
await ctx.aio_cancel() # async

Checking retry state

ctx.retry_count    # int — 0 on first attempt
ctx.attempt_number # int — 1 on first attempt (retry_count + 1)
ctx.max_attempts   # int — retries + 1

DurableContext extras

# Durably sleep
await ctx.aio_sleep_for(timedelta(minutes=10))
await ctx.aio_sleep_until(some_datetime)

# Wait for a user event
payload = await ctx.aio_wait_for_event("order:paid")

# Wait with multiple conditions
result = await ctx.aio_wait_for(
    "my-signal",
    SleepCondition(duration=timedelta(hours=1)),
    UserEventCondition(event_key="order:paid"),
)

# Memoize an expensive async call
result = await ctx._aio_memo(fetch_data, MyModel, arg1, arg2)

Triggering tasks and workflows

Standalone tasks and Workflow objects share the same run methods.

Synchronous trigger and wait

result = hello.run()                # EmptyModel input
result = greet.run(GreetInput(name="Alice"))  # typed input

Async trigger and wait

result = await hello.aio_run()

Fire-and-forget

ref = hello.run_no_wait()           # returns WorkflowRunRef
ref = await hello.aio_run_no_wait()

# Retrieve the result later
result = ref.result()
result = await ref.aio_result()

Bulk trigger

from hatchet_sdk import TriggerWorkflowOptions

items = [
    greet.create_bulk_run_item(GreetInput(name="Alice")),
    greet.create_bulk_run_item(GreetInput(name="Bob")),
]

results = greet.run_many(items)
results = await greet.aio_run_many(items)

# Without waiting for completion
refs = greet.run_many_no_wait(items)
refs = await greet.aio_run_many_no_wait(items)

Scheduling

One-time schedule

from datetime import datetime

schedule = hello.schedule(datetime(2025, 3, 14, 15, 9, 26))
print(schedule.id)

await hello.aio_schedule(datetime(2025, 3, 14, 15, 9, 26))

Cron schedule

cron = hello.create_cron(
    cron_name="nightly-hello",
    expression="0 2 * * *",
)

cron = await hello.aio_create_cron(
    cron_name="nightly-hello",
    expression="0 2 * * *",
)
Cron aliases @daily, @hourly, @weekly, @monthly, and @yearly are also accepted.

Build docs developers (and LLMs) love