Label-based routing to direct tasks to workers with specific capabilities.
Worker affinity lets you attach key/value labels to a worker at startup and then express requirements on task definitions so that the Hatchet scheduler sends each task to the right worker. A common use case is routing ML inference tasks to workers that have a specific model already loaded into GPU memory.
Labels are a dict[str, str | int] set on the worker at creation time. They describe what the worker can do — the model it has loaded, the amount of memory available, the region it is running in, and so on.
Use desired_worker_labels on a task definition to tell the scheduler which worker labels are required or preferred for that task. Each entry in the dictionary maps a label key to a DesiredWorkerLabel specification.
Python
TypeScript
Go
worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet, WorkerLabelComparatorfrom hatchet_sdk.labels import DesiredWorkerLabelhatchet = Hatchet(debug=True)affinity_worker_workflow = hatchet.workflow(name="AffinityWorkflow")@affinity_worker_workflow.task( desired_worker_labels={ # Soft preference: prefer the worker with this model loaded "model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10), # Hard requirement: worker memory must be less than 256 "memory": DesiredWorkerLabel( value=256, required=True, comparator=WorkerLabelComparator.LESS_THAN, ), },)async def step(input: EmptyModel, ctx: Context) -> dict: return {"worker": ctx.worker.id()}
When true, the task will only be dispatched to a worker whose label satisfies the comparator. When false, matching workers are preferred but the task can fall back to any available worker.
A numeric weight used to rank workers when multiple workers satisfy the label requirement. Higher weight means stronger preference. Only meaningful when required=False.
A worker can update its own labels while it is running. This is useful when a task loads a new model into memory and needs to advertise that fact so future tasks can be routed to it.
Python
TypeScript
Go
Call ctx.worker.upsert_labels() from inside a task. The labels are applied immediately and affect subsequent dispatches.
worker.py
@affinity_worker_workflow.task( desired_worker_labels={ "model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10), },)async def step(input: EmptyModel, ctx: Context) -> dict: if ctx.worker.labels().get("model") != "fancy-ai-model-v2": ctx.worker.upsert_labels({"model": "unset"}) # DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL ctx.worker.upsert_labels({"model": "fancy-ai-model-v2"}) return {"worker": ctx.worker.id()}
Call ctx.worker.upsertLabels() from inside a task.
workflow.task({ name: 'step', fn: async (_, ctx) => { if (ctx.worker.labels()['model'] !== 'fancy-ai-model-v2') { await ctx.worker.upsertLabels({ model: 'unset' }); // DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL await ctx.worker.upsertLabels({ model: 'fancy-ai-model-v2' }); } return { worker: ctx.worker.id() }; },});
Call ctx.Worker().UpsertLabels() from inside a task.
_ = affinityWorkflow.NewTask("step", func(ctx hatchet.Context, _ any) (*AffinityOutput, error) { if ctx.Worker().GetLabels()["model"] != "fancy-ai-model-v2" { _ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "unset"}) // DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL _ = ctx.Worker().UpsertLabels(map[string]interface{}{"model": "fancy-ai-model-v2"}) } return &AffinityOutput{Worker: ctx.Worker().ID()}, nil})
You can specify desired labels when triggering a run, without declaring them on the task definition. This lets you route a specific invocation to a particular worker dynamically.
The following example starts two workers with different affinity labels, then runs 20 tasks — each targeting a specific worker — and asserts that the correct worker handled each run.
Python
TypeScript
Go
Worker (run twice, once with --label foo and once with --label bar):