Add a cron expression directly to the workflow to have every registered worker run it on schedule.
Each cron expression declared on a workflow definition runs on the schedule as long as at least one worker with that workflow is connected. You do not need to manage the trigger separately.
You can also create named cron triggers at runtime via the Hatchet client. This is useful when each customer or tenant needs their own schedule.
Cron names must be unique per workflow. Creating a cron with a name that already exists will update the existing trigger rather than create a duplicate.
Python (sync)
Python (async)
Go
programatic-sync.py
from pydantic import BaseModelfrom hatchet_sdk import Hatchethatchet = Hatchet()class DynamicCronInput(BaseModel): name: strdynamic_cron_workflow = hatchet.workflow( name="DynamicCronWorkflow", input_validator=DynamicCronInput)# Create a named cron triggercron_trigger = dynamic_cron_workflow.create_cron( cron_name="customer-a-daily-report", expression="0 12 * * *", input=DynamicCronInput(name="John Doe"), additional_metadata={"customer_id": "customer-a"},)id = cron_trigger.metadata.id # the id of the cron trigger# List all cron triggerscron_triggers = hatchet.cron.list()# Get a specific cron triggercron_trigger = hatchet.cron.get(cron_id=id)# Delete a cron triggerhatchet.cron.delete(cron_id=id)
programatic-async.py
from pydantic import BaseModelfrom hatchet_sdk import Hatchethatchet = Hatchet()class DynamicCronInput(BaseModel): name: strasync def create_cron() -> None: dynamic_cron_workflow = hatchet.workflow( name="DynamicCronWorkflow", input_validator=DynamicCronInput ) # Create a named cron trigger cron_trigger = await dynamic_cron_workflow.aio_create_cron( cron_name="customer-a-daily-report", expression="0 12 * * *", input=DynamicCronInput(name="John Doe"), additional_metadata={"customer_id": "customer-a"}, ) id = cron_trigger.metadata.id # List all cron triggers await hatchet.cron.aio_list() # Get a specific cron trigger cron_trigger = await hatchet.cron.aio_get(cron_id=id) # Delete a cron trigger await hatchet.cron.aio_delete(cron_id=id)
Schedule a single run to execute at a specific future datetime. Hatchet stores the trigger and fires it at the given time regardless of whether a worker is currently connected.
Python (sync)
Python (async)
Go
programatic-sync.py
from datetime import datetime, timedelta, timezonefrom hatchet_sdk import Hatchethatchet = Hatchet()# Schedule a run 10 seconds from nowscheduled_run = hatchet.scheduled.create( workflow_name="simple-workflow", trigger_at=datetime.now(tz=timezone.utc) + timedelta(seconds=10), input={"data": "simple-workflow-data"}, additional_metadata={"customer_id": "customer-a"},)id = scheduled_run.metadata.id # the id of the scheduled run trigger# Reschedule to a later timehatchet.scheduled.update( scheduled_id=id, trigger_at=datetime.now(tz=timezone.utc) + timedelta(hours=1),)# List all scheduled runsscheduled_runs = hatchet.scheduled.list()# Get a specific scheduled runscheduled_run = hatchet.scheduled.get(scheduled_id=id)# Delete a scheduled runhatchet.scheduled.delete(scheduled_id=id)
programatic-async.py
from datetime import datetime, timedelta, timezonefrom hatchet_sdk import Hatchethatchet = Hatchet()async def create_scheduled() -> None: scheduled_run = await hatchet.scheduled.aio_create( workflow_name="simple-workflow", trigger_at=datetime.now(tz=timezone.utc) + timedelta(seconds=10), input={"data": "simple-workflow-data"}, additional_metadata={"customer_id": "customer-a"}, ) id = scheduled_run.metadata.id # Delete it await hatchet.scheduled.aio_delete(scheduled_id=id) # List all await hatchet.scheduled.aio_list() # Get by id scheduled_run = await hatchet.scheduled.aio_get(scheduled_id=id)
main.go
package mainimport ( "context" "log" "time" "github.com/hatchet-dev/hatchet/pkg/client/rest" hatchet "github.com/hatchet-dev/hatchet/sdks/go" "github.com/hatchet-dev/hatchet/sdks/go/features")// Schedule a run 1 minute from nowscheduledRun, err := client.Schedules().Create( context.Background(), "scheduled", features.CreateScheduledRunTrigger{ TriggerAt: time.Now().Add(1 * time.Minute), Input: map[string]interface{}{"message": "Hello, World!"}, },)if err != nil { log.Fatalf("failed to create scheduled run: %v", err)}// Delete the scheduled runerr = client.Schedules().Delete( context.Background(), scheduledRun.Metadata.Id,)// List all scheduled runsscheduledRuns, err := client.Schedules().List( context.Background(), rest.WorkflowScheduledListParams{},)
You can also schedule a run from inside a task. This is useful for chaining workflows across long time gaps:
worker.py
from datetime import datetime, timedelta, timezonefrom hatchet_sdk import Context, Hatchethatchet = Hatchet(debug=True)print_printer_wf = hatchet.workflow(name="PrintPrinterWorkflow")print_schedule_wf = hatchet.workflow(name="PrintScheduleWorkflow")@print_schedule_wf.task()def schedule(input, ctx: Context) -> None: now = datetime.now(tz=timezone.utc) future_time = now + timedelta(seconds=15) # Schedule the second workflow to run 15 seconds from now print_printer_wf.schedule(future_time, input=input)
Inside a durable task, call ctx.aio_sleep_for() to pause execution for a fixed duration without holding a worker slot. Hatchet persists the checkpoint and resumes the task on any available worker when the timer fires.
A workflow can declare one or more event keys. Whenever your application pushes a matching event, Hatchet starts a new run of the workflow automatically.