Skip to main content
Concurrency limits let you cap how many runs of a task or workflow execute at the same time. Common use cases include:
  • Per-user throttling — allow only one active job per user at a time
  • Per-tenant fairness — prevent any single tenant from monopolizing worker slots
  • Resource protection — limit concurrent calls to a downstream service
Hatchet evaluates concurrency rules at the workflow level. When a new run arrives and the limit is already reached, Hatchet applies the configured strategy to decide whether to cancel a running workflow, queue the new one, or fairly rotate across groups.

Set a constant limit

Pass an integer to concurrency to apply a simple cap across all runs of that workflow.
worker.py
from hatchet_sdk import Hatchet

hatchet = Hatchet(debug=True)

# Allow at most 5 concurrent runs of this workflow
simple_workflow = hatchet.workflow(
    name="SimpleWorkflow",
    concurrency=5,
)
Under the hood, passing an integer is shorthand for a ConcurrencyExpression with expression='constant' and limit_strategy=GROUP_ROUND_ROBIN.

Dynamic concurrency with ConcurrencyExpression

Use a CEL expression to group runs dynamically. Each unique value of the expression forms its own concurrency group, and max_runs applies independently per group.
worker.py
from pydantic import BaseModel
from hatchet_sdk import (
    ConcurrencyExpression,
    ConcurrencyLimitStrategy,
    Context,
    Hatchet,
)

hatchet = Hatchet(debug=True)

class WorkflowInput(BaseModel):
    run: int
    group_key: str

concurrency_limit_workflow = hatchet.workflow(
    name="ConcurrencyDemoWorkflow",
    concurrency=ConcurrencyExpression(
        expression="input.group_key",
        max_runs=5,
        limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
    ),
    input_validator=WorkflowInput,
)

@concurrency_limit_workflow.task()
def step1(input: WorkflowInput, ctx: Context) -> dict:
    return {"run": input.run}

ConcurrencyExpression parameters

expression
string
required
A CEL expression that determines the concurrency group for each run. Each unique result value forms an independent group with its own max_runs counter. For example, "input.user_id" creates a separate limit per user.
max_runs
int
required
Maximum number of concurrent runs allowed within each group.
limit_strategy
ConcurrencyLimitStrategy
required
What to do when max_runs is reached. See limit strategies below.

Limit strategies

CANCEL_IN_PROGRESS

When a new run arrives and the group is full, Hatchet cancels the oldest running workflow in that group and starts the new one. Use this when the latest request supersedes earlier ones — for example, re-processing a document after it has been edited.
worker.py
from pydantic import BaseModel
from hatchet_sdk import (
    ConcurrencyExpression,
    ConcurrencyLimitStrategy,
    Context,
    Hatchet,
)

hatchet = Hatchet(debug=True)

class WorkflowInput(BaseModel):
    group: str

concurrency_cancel_in_progress_workflow = hatchet.workflow(
    name="ConcurrencyCancelInProgress",
    concurrency=ConcurrencyExpression(
        expression="input.group",
        max_runs=1,
        limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
    ),
    input_validator=WorkflowInput,
)

@concurrency_cancel_in_progress_workflow.task()
async def step1(input: WorkflowInput, ctx: Context) -> None:
    import asyncio
    for _ in range(50):
        await asyncio.sleep(0.10)

@concurrency_cancel_in_progress_workflow.task(parents=[step1])
async def step2(input: WorkflowInput, ctx: Context) -> None:
    import asyncio
    for _ in range(50):
        await asyncio.sleep(0.10)

CANCEL_NEWEST

When the group is full, the incoming run is cancelled and the currently running workflows are left untouched. Use this when the work already in progress is more important than the new request.

GROUP_ROUND_ROBIN

Runs are distributed fairly across all active concurrency groups. When a slot opens up, Hatchet picks the group that has waited the longest and starts its next queued run.
GROUP_ROUND_ROBIN is a fairness strategy. In a multi-tenant system, it ensures no single tenant can starve others even if it submits many more jobs. This is also the strategy used when you pass a plain integer to concurrency.
worker.py
import time
from pydantic import BaseModel
from hatchet_sdk import (
    ConcurrencyExpression,
    ConcurrencyLimitStrategy,
    Context,
    Hatchet,
)

hatchet = Hatchet(debug=True)

class WorkflowInput(BaseModel):
    group: str

concurrency_limit_rr_workflow = hatchet.workflow(
    name="ConcurrencyDemoWorkflowRR",
    concurrency=ConcurrencyExpression(
        expression="input.group",
        max_runs=1,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
    ),
    input_validator=WorkflowInput,
)

@concurrency_limit_rr_workflow.task()
def step1(input: WorkflowInput, ctx: Context) -> None:
    print("starting step1")
    time.sleep(2)
    print("finished step1")

Multiple concurrency keys

Pass a list of ConcurrencyExpression objects to enforce several independent limits simultaneously. A run must satisfy all keys to proceed. For example, you can limit runs per tier (max 20 across the tier) and also per account (max 20 within the account) at the same time:
worker.py
from hatchet_sdk import (
    ConcurrencyExpression,
    ConcurrencyLimitStrategy,
    Hatchet,
)

hatchet = Hatchet(debug=True)

multi_key_workflow = hatchet.workflow(
    name="MultiKeyConcurrency",
    concurrency=[
        ConcurrencyExpression(
            expression="input.tier",
            max_runs=20,
            limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
        ),
        ConcurrencyExpression(
            expression="input.account",
            max_runs=20,
            limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
        ),
    ],
)

Next steps

Rate limiting

Throttle task execution within sliding time windows.

Scheduling

Run tasks on cron schedules or at a specific future time.

Build docs developers (and LLMs) love