Standalone tasks
A standalone task is the simplest unit of work. Use client.NewStandaloneTask() to create one — it behaves like a single-step workflow and exposes Run(), RunNoWait(), and RunMany() directly.
package main
import (
"context"
"fmt"
"log"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type SimpleInput struct {
Message string `json:"message"`
}
type SimpleOutput struct {
Result string `json:"result"`
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatal(err)
}
task := client.NewStandaloneTask(
"process-message",
func(ctx hatchet.Context, input SimpleInput) (SimpleOutput, error) {
return SimpleOutput{Result: "Processed: " + input.Message}, nil
},
)
result, err := task.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
if err != nil {
log.Fatal(err)
}
var output SimpleOutput
if err := result.Into(&output); err != nil {
log.Fatal(err)
}
fmt.Println(output.Result) // Processed: Hello, World!
}
Signature
func (c *Client) NewStandaloneTask(name string, fn any, options ...StandaloneTaskOption) *StandaloneTask
The fn parameter must have the signature:
func(ctx hatchet.Context, input InputType) (OutputType, error)
Function signatures are validated at runtime using reflection. Both InputType and OutputType can be any struct — the SDK uses JSON marshal/unmarshal to convert between types.
Multi-step workflows (DAGs)
Use client.NewWorkflow() to declare a workflow and add tasks to it. Tasks can declare WithParents() to form a directed acyclic graph (DAG).
package main
import (
"context"
"log"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type Input struct {
Value int `json:"value"`
}
type StepOutput struct {
Step int `json:"step"`
Result int `json:"result"`
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatal(err)
}
workflow := client.NewWorkflow("dag-workflow")
// First task — no parents, runs immediately
step1 := workflow.NewTask("step-1", func(ctx hatchet.Context, input Input) (StepOutput, error) {
return StepOutput{Step: 1, Result: input.Value * 2}, nil
})
// Second task — depends on step1
step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step1Output StepOutput
if err := ctx.ParentOutput(step1, &step1Output); err != nil {
return StepOutput{}, err
}
return StepOutput{Step: 2, Result: step1Output.Result + 10}, nil
}, hatchet.WithParents(step1))
_ = step2
// Run the workflow
result, err := workflow.Run(context.Background(), Input{Value: 5})
if err != nil {
log.Fatal(err)
}
_ = result
}
Task functions
Every task function must have exactly this signature:
func(ctx hatchet.Context, input InputType) (OutputType, error)
For durable tasks, use hatchet.DurableContext instead:
func(ctx hatchet.DurableContext, input InputType) (OutputType, error)
The SDK validates function signatures at runtime using reflection and panics with a descriptive message if the signature does not match.
Workflow options
WithWorkflowConcurrency(concurrency ...types.Concurrency)
Sets concurrency controls for the workflow. Each types.Concurrency value contains an Expression (a CEL expression over the workflow input) and a MaxRuns count.workflow := client.NewWorkflow("my-workflow",
hatchet.WithWorkflowConcurrency(types.Concurrency{
Expression: "input.userId",
MaxRuns: 5,
}),
)
WithWorkflowEvents(events ...string)
Configures the workflow to trigger automatically when any of the listed event keys are pushed.workflow := client.NewWorkflow("user-handler",
hatchet.WithWorkflowEvents("user:created", "user:updated"),
)
WithWorkflowCron(cronExpressions ...string)
Configures the workflow to run on one or more cron schedules.workflow := client.NewWorkflow("nightly-job",
hatchet.WithWorkflowCron("0 0 * * *"),
)
WithWorkflowCronInput(input any)
Sets the input used when the workflow is triggered by a cron schedule.
WithWorkflowStickyStrategy(strategy types.StickyStrategy)
Sets the sticky worker strategy. When enabled, Hatchet routes all tasks in a workflow run to the same worker.
WithWorkflowDefaultPriority(priority RunPriority)
Sets the default run priority for this workflow. Available constants: RunPriorityLow (1), RunPriorityMedium (2), RunPriorityHigh (3).
WithWorkflowVersion(version string)
Sets an optional version string for the workflow definition.
WithWorkflowDescription(description string)
Sets a human-readable description shown in the Hatchet dashboard.
WithWorkflowTaskDefaults(defaults *create.TaskDefaults)
Sets default configuration applied to all tasks in the workflow. Individual tasks can override these defaults.
Task options
WithParents(parents ...*Task)
Declares parent task dependencies. The task will not run until all listed parent tasks have completed successfully.step2 := workflow.NewTask("step-2", myFn, hatchet.WithParents(step1))
Number of times to retry the task on failure.
WithRetryBackoff(factor float32, maxBackoffSeconds int)
Configures exponential backoff for retries. factor is the base of the exponent and maxBackoffSeconds caps the delay.
WithExecutionTimeout(timeout time.Duration)
Maximum wall-clock time the task function is allowed to run.hatchet.WithExecutionTimeout(30 * time.Second)
WithScheduleTimeout(timeout time.Duration)
Maximum time the task is allowed to wait in the queue before it starts.
WithRateLimits(rateLimits ...*types.RateLimit)
Rate limit keys consumed when this task runs.
WithConcurrency(concurrency ...*types.Concurrency)
Concurrency controls scoped to this specific task.
WithWaitFor(condition condition.Condition)
A condition that must be met before the task executes. Use hatchet.SleepCondition(), hatchet.UserEventCondition(), or hatchet.ParentCondition() to create conditions.
WithSkipIf(condition condition.Condition)
A condition that, if met, causes the task to be skipped.
WithDescription(description string)
A human-readable description for the task.
Context methods
The hatchet.Context is passed as the first argument to every task function. It implements context.Context and provides access to workflow execution metadata.
Accessing parent outputs
In DAG workflows, use ctx.ParentOutput() to retrieve the typed output of a completed parent task:
step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step1Output StepOutput
if err := ctx.ParentOutput(step1, &step1Output); err != nil {
return StepOutput{}, err
}
return StepOutput{Result: step1Output.Result + 10}, nil
}, hatchet.WithParents(step1))
Logging
func myTask(ctx hatchet.Context, input Input) (Output, error) {
ctx.Log("Processing started")
// Log lines appear in the Hatchet dashboard under the task run
return Output{}, nil
}
Other context methods
| Method | Returns | Description |
|---|
ctx.WorkflowRunId() | string | ID of the current workflow run. |
ctx.StepRunId() | string | ID of the current task (step) run. |
ctx.StepName() | string | Name of the currently executing task. |
ctx.RetryCount() | int | Number of times this task has been retried. |
ctx.AdditionalMetadata() | map[string]string | Metadata attached to the workflow run. |
ctx.TriggeredByEvent() | bool | Whether the workflow was triggered by an event. |
ctx.WorkflowInput(target interface{}) | error | Unmarshals the full workflow input into target. |
ctx.StepRunErrors() | map[string]string | Task run errors — use in OnFailure handlers. |
ctx.Priority() | int32 | Priority of the current run. |
ctx.TenantId() | string | The tenant ID for the current run. |
ctx.WorkerId() | string | The ID of the worker executing the task. |
ctx.ReleaseSlot() | error | Releases the worker slot while the task continues. |
ctx.RefreshTimeout(duration string) | error | Extends the execution timeout by duration. |
ctx.StreamEvent(message []byte) | — | Streams raw bytes from the task. |
ctx.PutStream(message string) | — | Streams a string message from the task. |
ctx.WasSkipped(parent create.NamedTask) | bool | Whether the given parent task was skipped. |
Durable tasks
Durable tasks survive worker restarts and support long-running waits. Create them with workflow.NewDurableTask() or client.NewStandaloneDurableTask(). The task function receives a hatchet.DurableContext which extends hatchet.Context with:
| Method | Returns | Description |
|---|
ctx.SleepFor(duration time.Duration) | (*SingleWaitResult, error) | Pause execution durably for the given duration. |
ctx.WaitForEvent(eventKey, expression string) | (*SingleWaitResult, error) | Pause until the named user event arrives. |
ctx.WaitFor(condition condition.Condition) | (*WaitResult, error) | Pause until one or more conditions are satisfied. |
task := workflow.NewDurableTask(
"wait-for-approval",
func(ctx hatchet.DurableContext, input ApprovalInput) (*ApprovalOutput, error) {
// Pause for up to 24 hours waiting for an approval event
event, err := ctx.WaitForEvent("order:approved", "")
if err != nil {
return nil, err
}
var data map[string]interface{}
if err := hatchet.EventInto(event, &data); err != nil {
return nil, err
}
return &ApprovalOutput{Approved: true}, nil
},
)
Conditions
Conditions control when a task executes. Use them with WithWaitFor() and WithSkipIf().
// Wait 10 seconds before running
hatchet.WithWaitFor(hatchet.SleepCondition(10 * time.Second))
// Wait for a user event
hatchet.WithWaitFor(hatchet.UserEventCondition("order:approved", ""))
// Wait until a parent output satisfies a CEL expression
hatchet.WithWaitFor(hatchet.ParentCondition(step1, "output.status == 'ready'"))
// Any of the conditions must be met (OR)
hatchet.WithWaitFor(hatchet.OrCondition(
hatchet.SleepCondition(10 * time.Second),
hatchet.UserEventCondition("order:approved", ""),
))
// All conditions must be met (AND)
hatchet.WithWaitFor(hatchet.AndCondition(
hatchet.ParentCondition(step1, "output.ready == true"),
hatchet.UserEventCondition("payment:confirmed", ""),
))
Triggering runs
Runs a workflow and blocks until it completes. Returns a *WorkflowResult.
result, err := workflow.Run(context.Background(), Input{Value: 5})
if err != nil {
log.Fatal(err)
}
// Extract a specific task's output
var step1Out StepOutput
if err := result.TaskOutput("step-1").Into(&step1Out); err != nil {
log.Fatal(err)
}
Enqueues a workflow run and immediately returns a *WorkflowRunRef without blocking.
ref, err := workflow.RunNoWait(context.Background(), Input{Value: 5})
if err != nil {
log.Fatal(err)
}
fmt.Println(ref.RunId)
// Later, wait for the result
result, err := ref.Result()
Runs multiple workflow instances concurrently with different inputs. Returns []WorkflowRunRef.
refs, err := workflow.RunMany(context.Background(), []hatchet.RunManyOpt{
{Input: Input{Value: 1}},
{Input: Input{Value: 2}},
})
Run options
WithRunMetadata(metadata map[string]string)
Attaches arbitrary key-value metadata to the run, visible in the Hatchet dashboard.
WithRunPriority(priority RunPriority)
Sets the run priority. Constants: RunPriorityLow (1), RunPriorityMedium (2), RunPriorityHigh (3).
WithRunSticky(sticky bool)
When true and called from within a parent task, routes the run to the same worker.
A deduplication key for child workflow runs spawned from within a parent task.
WithDesiredWorkerLabels(labels map[string]*DesiredWorkerLabel)
Routes the run to workers that match the specified labels.
result, err := task.Run(ctx, input)
if err != nil {
log.Fatal(err)
}
// Unmarshal directly from a standalone task result
var output MyOutput
if err := result.Into(&output); err != nil {
log.Fatal(err)
}
// For workflow results, extract by task name
taskResult := workflowResult.TaskOutput("my-task")
var taskOutput MyTaskOutput
if err := taskResult.Into(&taskOutput); err != nil {
log.Fatal(err)
}
Event triggers
Workflows and standalone tasks can be triggered automatically when events are pushed. Pass WithWorkflowEvents() as a workflow option or hatchet.WithWorkflowEvents() as a StandaloneTaskOption:
package main
import (
"context"
"log"
"time"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type EventInput struct {
UserID string `json:"user_id"`
Action string `json:"action"`
}
type ProcessOutput struct {
ProcessedAt string `json:"processed_at"`
UserID string `json:"user_id"`
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatal(err)
}
task := client.NewStandaloneTask(
"process-user-event",
func(ctx hatchet.Context, input EventInput) (ProcessOutput, error) {
return ProcessOutput{
ProcessedAt: time.Now().Format(time.RFC3339),
UserID: input.UserID,
}, nil
},
hatchet.WithWorkflowEvents("user:created", "user:updated"),
)
_ = task
// Push an event programmatically
err = client.Events().Push(context.Background(), "user:created", EventInput{
UserID: "user-123",
Action: "created",
})
if err != nil {
log.Fatal(err)
}
}
Cron triggers
Declare recurring cron schedules directly on a workflow or standalone task:
// On a workflow
workflow := client.NewWorkflow("nightly-report",
hatchet.WithWorkflowCron("0 0 * * *"),
hatchet.WithWorkflowCronInput(map[string]interface{}{"full": true}),
)
// On a standalone task
task := client.NewStandaloneTask(
"send-digest",
myFn,
hatchet.WithCron("0 9 * * 1-5"), // weekdays at 9 AM
)
Failure handlers
Call workflow.OnFailure() or task.OnFailure() to register a function that runs when any task in the workflow fails:
workflow.OnFailure(func(ctx hatchet.Context, input Input) (*Output, error) {
errors := ctx.StepRunErrors()
log.Printf("Workflow failed: %v", errors)
return nil, nil
})
Spawning child workflows
From within a task, use ctx.SpawnWorkflow() or call .Run() on another workflow or standalone task passing the hatchet.Context directly:
parent := workflow.NewTask("parent-task", func(ctx hatchet.Context, input Input) (*Output, error) {
// Pass the hatchet.Context to run a child workflow and wait for its result
result, err := childTask.Run(ctx, ChildInput{Value: input.Value})
if err != nil {
return nil, err
}
var childOutput ChildOutput
if err := result.Into(&childOutput); err != nil {
return nil, err
}
return &Output{Result: childOutput.Result}, nil
})