Skip to main content
Worker affinity lets you attach key/value labels to a worker at startup and then express requirements on task definitions so that the Hatchet scheduler sends each task to the right worker. A common use case is routing ML inference tasks to workers that have a specific model already loaded into GPU memory.

Worker labels

Labels are a dict[str, str | int] set on the worker at creation time. They describe what the worker can do — the model it has loaded, the amount of memory available, the region it is running in, and so on.
worker.py
from hatchet_sdk import Hatchet

hatchet = Hatchet(debug=True)

worker = hatchet.worker(
    "affinity-worker",
    slots=10,
    labels={
        "model": "fancy-ai-model-v2",
        "memory": 512,
    },
    workflows=[affinity_worker_workflow],
)
worker.start()

Desired worker labels on task definitions

Use desired_worker_labels on a task definition to tell the scheduler which worker labels are required or preferred for that task. Each entry in the dictionary maps a label key to a DesiredWorkerLabel specification.
worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet, WorkerLabelComparator
from hatchet_sdk.labels import DesiredWorkerLabel

hatchet = Hatchet(debug=True)

affinity_worker_workflow = hatchet.workflow(name="AffinityWorkflow")


@affinity_worker_workflow.task(
    desired_worker_labels={
        # Soft preference: prefer the worker with this model loaded
        "model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10),
        # Hard requirement: worker memory must be less than 256
        "memory": DesiredWorkerLabel(
            value=256,
            required=True,
            comparator=WorkerLabelComparator.LESS_THAN,
        ),
    },
)
async def step(input: EmptyModel, ctx: Context) -> dict:
    return {"worker": ctx.worker.id()}

DesiredWorkerLabel parameters

value
str | int
required
The label value to match against. Can be a string or an integer.
required
bool
default:"false"
When true, the task will only be dispatched to a worker whose label satisfies the comparator. When false, matching workers are preferred but the task can fall back to any available worker.
weight
int
A numeric weight used to rank workers when multiple workers satisfy the label requirement. Higher weight means stronger preference. Only meaningful when required=False.
comparator
WorkerLabelComparator
Determines how the task’s value is compared against the worker’s label value. Defaults to EQUAL. Available comparators:
ComparatorDescription
EQUALWorker label value must equal the desired value.
NOT_EQUALWorker label value must not equal the desired value.
GREATER_THANWorker label value must be greater than the desired value.
GREATER_THAN_OR_EQUALWorker label value must be greater than or equal to the desired value.
LESS_THANWorker label value must be less than the desired value.
LESS_THAN_OR_EQUALWorker label value must be less than or equal to the desired value.

Updating labels at runtime

A worker can update its own labels while it is running. This is useful when a task loads a new model into memory and needs to advertise that fact so future tasks can be routed to it.
Call ctx.worker.upsert_labels() from inside a task. The labels are applied immediately and affect subsequent dispatches.
worker.py
@affinity_worker_workflow.task(
    desired_worker_labels={
        "model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10),
    },
)
async def step(input: EmptyModel, ctx: Context) -> dict:
    if ctx.worker.labels().get("model") != "fancy-ai-model-v2":
        ctx.worker.upsert_labels({"model": "unset"})
        # DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL
        ctx.worker.upsert_labels({"model": "fancy-ai-model-v2"})

    return {"worker": ctx.worker.id()}

Targeting a worker at run time

You can specify desired labels when triggering a run, without declaring them on the task definition. This lets you route a specific invocation to a particular worker dynamically.
from hatchet_sdk import TriggerWorkflowOptions
from hatchet_sdk.labels import DesiredWorkerLabel

result = await affinity_example_task.aio_run(
    options=TriggerWorkflowOptions(
        desired_worker_label={
            "affinity": DesiredWorkerLabel(
                value="foo",
                required=True,
            ),
        }
    )
)

Full example

The following example starts two workers with different affinity labels, then runs 20 tasks — each targeting a specific worker — and asserts that the correct worker handled each run.
Worker (run twice, once with --label foo and once with --label bar):
worker.py
import argparse

from hatchet_sdk import Context, EmptyModel, Hatchet
from pydantic import BaseModel

hatchet = Hatchet(debug=True)


class AffinityResult(BaseModel):
    worker_id: str


@hatchet.task()
async def affinity_example_task(i: EmptyModel, c: Context) -> AffinityResult:
    return AffinityResult(worker_id=c.worker_id)


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("--label", type=str, required=True)
    args = parser.parse_args()

    worker = hatchet.worker(
        "runtime-affinity-worker",
        labels={"affinity": args.label},
        workflows=[affinity_example_task],
    )
    worker.start()


if __name__ == "__main__":
    main()
Trigger (targeting a specific worker by label):
trigger.py
from hatchet_sdk import Hatchet, TriggerWorkflowOptions
from hatchet_sdk.labels import DesiredWorkerLabel
from examples.runtime_affinity.worker import affinity_example_task
from random import choice

labels = ["foo", "bar"]

target_worker = choice(labels)
res = await affinity_example_task.aio_run(
    options=TriggerWorkflowOptions(
        desired_worker_label={
            "affinity": DesiredWorkerLabel(
                value=target_worker,
                required=True,
            ),
        }
    )
)
print(res.worker_id)

Next steps

Task routing

Sticky assignment and priority-based routing.

Workers overview

Slots, lifespan functions, and worker startup.

Build docs developers (and LLMs) love