Skip to main content
By default, Hatchet dispatches each task to any available worker that has registered the corresponding workflow. You can override this behavior to pin tasks to specific workers or control execution order.

Sticky assignment

Sticky assignment ensures that all tasks within a workflow run execute on the same worker. This is useful when your tasks share in-process state — for example, a loaded ML model or an in-memory cache — that would be expensive to reconstruct on a different worker. Set sticky on the workflow or standalone task definition. Two strategies are available:
StrategyBehavior
StickyStrategy.SOFTPrefer the same worker. Falls back to any available worker if the original is unavailable.
StickyStrategy.HARDRequire the same worker. The task will not run unless the original worker is available.

Sticky workflow

worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet, StickyStrategy

hatchet = Hatchet(debug=True)

sticky_workflow = hatchet.workflow(
    name="StickyWorkflow",
    # Specify a sticky strategy when declaring the workflow
    sticky=StickyStrategy.SOFT,
)


@sticky_workflow.task()
def step1a(input: EmptyModel, ctx: Context) -> dict:
    return {"worker": ctx.worker.id()}


@sticky_workflow.task()
def step1b(input: EmptyModel, ctx: Context) -> dict:
    return {"worker": ctx.worker.id()}
When sticky=StickyStrategy.SOFT is set on the workflow, Hatchet will try to route all tasks in the run to the same worker that picked up the first task.

Sticky child workflows

You can propagate sticky assignment to child workflows spawned from within a task. Pass sticky=True when spawning the child to request that it run on the same worker as the parent.
worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet, StickyStrategy, TriggerWorkflowOptions

hatchet = Hatchet(debug=True)

sticky_workflow = hatchet.workflow(name="StickyWorkflow", sticky=StickyStrategy.SOFT)
sticky_child_workflow = hatchet.workflow(name="StickyChildWorkflow", sticky=StickyStrategy.SOFT)


@sticky_workflow.task(parents=[step1a, step1b])
async def step2(input: EmptyModel, ctx: Context) -> dict:
    # Run a child workflow on the same worker
    ref = await sticky_child_workflow.aio_run_no_wait(
        options=TriggerWorkflowOptions(sticky=True)
    )
    await ref.aio_result()
    return {"worker": ctx.worker.id()}


@sticky_child_workflow.task()
def child(input: EmptyModel, ctx: Context) -> dict:
    return {"worker": ctx.worker.id()}

Registering the worker

worker.py
def main() -> None:
    worker = hatchet.worker(
        "sticky-worker",
        slots=10,
        workflows=[sticky_workflow, sticky_child_workflow],
    )
    worker.start()

if __name__ == "__main__":
    main()
For StickyStrategy.HARD, the task will fail with a scheduling error rather than fall back to another worker if the original worker is unavailable. Use SOFT unless your correctness requirements demand the same worker.

Task priority

When multiple tasks are queued, Hatchet uses priority to determine which ones are dispatched first. Higher values win. Set a default priority on the workflow or task definition:
high_priority_workflow = hatchet.workflow(
    name="HighPriorityWorkflow",
    default_priority=3,  # 1 (low) to 3 (high), default is 1
)

# Or on a standalone task
@hatchet.task(default_priority=3)
def urgent_task(input: EmptyModel, ctx: Context) -> dict:
    return {}
You can also override priority at run time when triggering a task:
from hatchet_sdk import TriggerWorkflowOptions

my_task.run(
    options=TriggerWorkflowOptions(priority=3),
)
Priority values range from 1 (lowest) to 3 (highest). The default is 1. Use priority sparingly — if everything is high priority, nothing is.

Desired worker labels at run time

You can target a specific worker at the point of triggering a task run by passing desired_worker_labels (Python / TypeScript) or WithDesiredWorkerLabels (Go). The scheduler will prefer — or require, if required=True — a worker whose labels match.
from hatchet_sdk import TriggerWorkflowOptions
from hatchet_sdk.labels import DesiredWorkerLabel

my_task.run(
    options=TriggerWorkflowOptions(
        desired_worker_label={
            "affinity": DesiredWorkerLabel(
                value="foo",
                required=True,
            ),
        }
    )
)
See Worker affinity for full details on label-based routing and comparators.

Next steps

Worker affinity

Label-based routing to direct tasks to workers with specific capabilities.

Workers overview

Configure slots, lifespan functions, and worker startup.

Build docs developers (and LLMs) love