Skip to main content
Hatchet supports three scheduling modes:
ModeWhen to use
CronRepeat a task on a fixed schedule (e.g. every hour, every Monday at 9 AM)
One-timeTrigger a run at a specific future datetime
EventTrigger a run whenever a named event is pushed from your application
For long-running tasks that need to sleep mid-execution, use a durable task with ctx.aio_sleep_for() instead of scheduling a new workflow.

Cron schedules

Declare a cron in the workflow definition

Add a cron expression directly to the workflow to have every registered worker run it on schedule.
Each cron expression declared on a workflow definition runs on the schedule as long as at least one worker with that workflow is connected. You do not need to manage the trigger separately.
worker.py
from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

cron_workflow = hatchet.workflow(name="CronWorkflow", on_crons=["* * * * *"])

@cron_workflow.task()
def step1(input: EmptyModel, ctx: Context) -> dict[str, str]:
    return {"time": "step1"}

def main() -> None:
    worker = hatchet.worker("test-worker", slots=1, workflows=[cron_workflow])
    worker.start()

if __name__ == "__main__":
    main()

Create and manage crons programmatically

You can also create named cron triggers at runtime via the Hatchet client. This is useful when each customer or tenant needs their own schedule.
Cron names must be unique per workflow. Creating a cron with a name that already exists will update the existing trigger rather than create a duplicate.
programatic-sync.py
from pydantic import BaseModel
from hatchet_sdk import Hatchet

hatchet = Hatchet()

class DynamicCronInput(BaseModel):
    name: str

dynamic_cron_workflow = hatchet.workflow(
    name="DynamicCronWorkflow", input_validator=DynamicCronInput
)

# Create a named cron trigger
cron_trigger = dynamic_cron_workflow.create_cron(
    cron_name="customer-a-daily-report",
    expression="0 12 * * *",
    input=DynamicCronInput(name="John Doe"),
    additional_metadata={"customer_id": "customer-a"},
)

id = cron_trigger.metadata.id  # the id of the cron trigger

# List all cron triggers
cron_triggers = hatchet.cron.list()

# Get a specific cron trigger
cron_trigger = hatchet.cron.get(cron_id=id)

# Delete a cron trigger
hatchet.cron.delete(cron_id=id)

One-time scheduled runs

Schedule a single run to execute at a specific future datetime. Hatchet stores the trigger and fires it at the given time regardless of whether a worker is currently connected.
programatic-sync.py
from datetime import datetime, timedelta, timezone
from hatchet_sdk import Hatchet

hatchet = Hatchet()

# Schedule a run 10 seconds from now
scheduled_run = hatchet.scheduled.create(
    workflow_name="simple-workflow",
    trigger_at=datetime.now(tz=timezone.utc) + timedelta(seconds=10),
    input={"data": "simple-workflow-data"},
    additional_metadata={"customer_id": "customer-a"},
)

id = scheduled_run.metadata.id  # the id of the scheduled run trigger

# Reschedule to a later time
hatchet.scheduled.update(
    scheduled_id=id,
    trigger_at=datetime.now(tz=timezone.utc) + timedelta(hours=1),
)

# List all scheduled runs
scheduled_runs = hatchet.scheduled.list()

# Get a specific scheduled run
scheduled_run = hatchet.scheduled.get(scheduled_id=id)

# Delete a scheduled run
hatchet.scheduled.delete(scheduled_id=id)
You can also schedule a run from inside a task. This is useful for chaining workflows across long time gaps:
worker.py
from datetime import datetime, timedelta, timezone
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet(debug=True)

print_printer_wf = hatchet.workflow(name="PrintPrinterWorkflow")
print_schedule_wf = hatchet.workflow(name="PrintScheduleWorkflow")

@print_schedule_wf.task()
def schedule(input, ctx: Context) -> None:
    now = datetime.now(tz=timezone.utc)
    future_time = now + timedelta(seconds=15)
    # Schedule the second workflow to run 15 seconds from now
    print_printer_wf.schedule(future_time, input=input)

Durable sleep

Inside a durable task, call ctx.aio_sleep_for() to pause execution for a fixed duration without holding a worker slot. Hatchet persists the checkpoint and resumes the task on any available worker when the timer fires.
worker.py
from datetime import timedelta
from hatchet_sdk import DurableContext, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)

durable_workflow = hatchet.workflow(name="DurableWorkflow")

@durable_workflow.durable_task()
async def durable_task(input: EmptyModel, ctx: DurableContext) -> dict:
    sleep = await ctx.aio_sleep_for(duration=timedelta(seconds=5))
    return {"sleep_duration_seconds": sleep.duration.seconds}
See Durable execution for the full reference, including waiting for external events and combining conditions.

Event triggers

A workflow can declare one or more event keys. Whenever your application pushes a matching event, Hatchet starts a new run of the workflow automatically.

Define an event-triggered workflow

worker.py
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet()

EVENT_KEY = "user:create"

class EventWorkflowInput(BaseModel):
    should_skip: bool

event_workflow = hatchet.workflow(
    name="EventWorkflow",
    on_events=[EVENT_KEY],
    input_validator=EventWorkflowInput,
)

@event_workflow.task()
def task(input: EventWorkflowInput, ctx: Context) -> dict[str, str]:
    print("event received")
    return {}

def main() -> None:
    worker = hatchet.worker(name="EventWorker", workflows=[event_workflow])
    worker.start()

if __name__ == "__main__":
    main()

Push an event

Use the Hatchet client from anywhere in your application to fire an event:
event.py
from hatchet_sdk import Hatchet

hatchet = Hatchet()

hatchet.event.push("user:create", {"should_skip": False})

Next steps

Durable execution

Sleep mid-task and wait for external events without holding a worker slot.

Concurrency limits

Cap how many scheduled runs execute simultaneously.

Build docs developers (and LLMs) love