Skip to main content

Standalone tasks

A standalone task is the simplest unit of work. Use client.NewStandaloneTask() to create one — it behaves like a single-step workflow and exposes Run(), RunNoWait(), and RunMany() directly.
workflow.go
package main

import (
    "context"
    "fmt"
    "log"

    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

type SimpleInput struct {
    Message string `json:"message"`
}

type SimpleOutput struct {
    Result string `json:"result"`
}

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatal(err)
    }

    task := client.NewStandaloneTask(
        "process-message",
        func(ctx hatchet.Context, input SimpleInput) (SimpleOutput, error) {
            return SimpleOutput{Result: "Processed: " + input.Message}, nil
        },
    )

    result, err := task.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
    if err != nil {
        log.Fatal(err)
    }

    var output SimpleOutput
    if err := result.Into(&output); err != nil {
        log.Fatal(err)
    }

    fmt.Println(output.Result) // Processed: Hello, World!
}

Signature

func (c *Client) NewStandaloneTask(name string, fn any, options ...StandaloneTaskOption) *StandaloneTask
The fn parameter must have the signature:
func(ctx hatchet.Context, input InputType) (OutputType, error)
Function signatures are validated at runtime using reflection. Both InputType and OutputType can be any struct — the SDK uses JSON marshal/unmarshal to convert between types.

Multi-step workflows (DAGs)

Use client.NewWorkflow() to declare a workflow and add tasks to it. Tasks can declare WithParents() to form a directed acyclic graph (DAG).
dag.go
package main

import (
    "context"
    "log"

    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

type Input struct {
    Value int `json:"value"`
}

type StepOutput struct {
    Step   int `json:"step"`
    Result int `json:"result"`
}

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatal(err)
    }

    workflow := client.NewWorkflow("dag-workflow")

    // First task — no parents, runs immediately
    step1 := workflow.NewTask("step-1", func(ctx hatchet.Context, input Input) (StepOutput, error) {
        return StepOutput{Step: 1, Result: input.Value * 2}, nil
    })

    // Second task — depends on step1
    step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) {
        var step1Output StepOutput
        if err := ctx.ParentOutput(step1, &step1Output); err != nil {
            return StepOutput{}, err
        }
        return StepOutput{Step: 2, Result: step1Output.Result + 10}, nil
    }, hatchet.WithParents(step1))

    _ = step2

    // Run the workflow
    result, err := workflow.Run(context.Background(), Input{Value: 5})
    if err != nil {
        log.Fatal(err)
    }

    _ = result
}

Task functions

Every task function must have exactly this signature:
func(ctx hatchet.Context, input InputType) (OutputType, error)
For durable tasks, use hatchet.DurableContext instead:
func(ctx hatchet.DurableContext, input InputType) (OutputType, error)
The SDK validates function signatures at runtime using reflection and panics with a descriptive message if the signature does not match.

Workflow options

WithWorkflowConcurrency(concurrency ...types.Concurrency)
WorkflowOption
Sets concurrency controls for the workflow. Each types.Concurrency value contains an Expression (a CEL expression over the workflow input) and a MaxRuns count.
workflow := client.NewWorkflow("my-workflow",
    hatchet.WithWorkflowConcurrency(types.Concurrency{
        Expression: "input.userId",
        MaxRuns:    5,
    }),
)
WithWorkflowEvents(events ...string)
WorkflowOption
Configures the workflow to trigger automatically when any of the listed event keys are pushed.
workflow := client.NewWorkflow("user-handler",
    hatchet.WithWorkflowEvents("user:created", "user:updated"),
)
WithWorkflowCron(cronExpressions ...string)
WorkflowOption
Configures the workflow to run on one or more cron schedules.
workflow := client.NewWorkflow("nightly-job",
    hatchet.WithWorkflowCron("0 0 * * *"),
)
WithWorkflowCronInput(input any)
WorkflowOption
Sets the input used when the workflow is triggered by a cron schedule.
WithWorkflowStickyStrategy(strategy types.StickyStrategy)
WorkflowOption
Sets the sticky worker strategy. When enabled, Hatchet routes all tasks in a workflow run to the same worker.
WithWorkflowDefaultPriority(priority RunPriority)
WorkflowOption
Sets the default run priority for this workflow. Available constants: RunPriorityLow (1), RunPriorityMedium (2), RunPriorityHigh (3).
WithWorkflowVersion(version string)
WorkflowOption
Sets an optional version string for the workflow definition.
WithWorkflowDescription(description string)
WorkflowOption
Sets a human-readable description shown in the Hatchet dashboard.
WithWorkflowTaskDefaults(defaults *create.TaskDefaults)
WorkflowOption
Sets default configuration applied to all tasks in the workflow. Individual tasks can override these defaults.

Task options

WithParents(parents ...*Task)
TaskOption
Declares parent task dependencies. The task will not run until all listed parent tasks have completed successfully.
step2 := workflow.NewTask("step-2", myFn, hatchet.WithParents(step1))
WithRetries(retries int)
TaskOption
Number of times to retry the task on failure.
WithRetryBackoff(factor float32, maxBackoffSeconds int)
TaskOption
Configures exponential backoff for retries. factor is the base of the exponent and maxBackoffSeconds caps the delay.
WithExecutionTimeout(timeout time.Duration)
TaskOption
Maximum wall-clock time the task function is allowed to run.
hatchet.WithExecutionTimeout(30 * time.Second)
WithScheduleTimeout(timeout time.Duration)
TaskOption
Maximum time the task is allowed to wait in the queue before it starts.
WithRateLimits(rateLimits ...*types.RateLimit)
TaskOption
Rate limit keys consumed when this task runs.
WithConcurrency(concurrency ...*types.Concurrency)
TaskOption
Concurrency controls scoped to this specific task.
WithWaitFor(condition condition.Condition)
TaskOption
A condition that must be met before the task executes. Use hatchet.SleepCondition(), hatchet.UserEventCondition(), or hatchet.ParentCondition() to create conditions.
WithSkipIf(condition condition.Condition)
TaskOption
A condition that, if met, causes the task to be skipped.
WithDescription(description string)
TaskOption
A human-readable description for the task.

Context methods

The hatchet.Context is passed as the first argument to every task function. It implements context.Context and provides access to workflow execution metadata.

Accessing parent outputs

In DAG workflows, use ctx.ParentOutput() to retrieve the typed output of a completed parent task:
step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) {
    var step1Output StepOutput
    if err := ctx.ParentOutput(step1, &step1Output); err != nil {
        return StepOutput{}, err
    }
    return StepOutput{Result: step1Output.Result + 10}, nil
}, hatchet.WithParents(step1))

Logging

func myTask(ctx hatchet.Context, input Input) (Output, error) {
    ctx.Log("Processing started")
    // Log lines appear in the Hatchet dashboard under the task run
    return Output{}, nil
}

Other context methods

MethodReturnsDescription
ctx.WorkflowRunId()stringID of the current workflow run.
ctx.StepRunId()stringID of the current task (step) run.
ctx.StepName()stringName of the currently executing task.
ctx.RetryCount()intNumber of times this task has been retried.
ctx.AdditionalMetadata()map[string]stringMetadata attached to the workflow run.
ctx.TriggeredByEvent()boolWhether the workflow was triggered by an event.
ctx.WorkflowInput(target interface{})errorUnmarshals the full workflow input into target.
ctx.StepRunErrors()map[string]stringTask run errors — use in OnFailure handlers.
ctx.Priority()int32Priority of the current run.
ctx.TenantId()stringThe tenant ID for the current run.
ctx.WorkerId()stringThe ID of the worker executing the task.
ctx.ReleaseSlot()errorReleases the worker slot while the task continues.
ctx.RefreshTimeout(duration string)errorExtends the execution timeout by duration.
ctx.StreamEvent(message []byte)Streams raw bytes from the task.
ctx.PutStream(message string)Streams a string message from the task.
ctx.WasSkipped(parent create.NamedTask)boolWhether the given parent task was skipped.

Durable tasks

Durable tasks survive worker restarts and support long-running waits. Create them with workflow.NewDurableTask() or client.NewStandaloneDurableTask(). The task function receives a hatchet.DurableContext which extends hatchet.Context with:
MethodReturnsDescription
ctx.SleepFor(duration time.Duration)(*SingleWaitResult, error)Pause execution durably for the given duration.
ctx.WaitForEvent(eventKey, expression string)(*SingleWaitResult, error)Pause until the named user event arrives.
ctx.WaitFor(condition condition.Condition)(*WaitResult, error)Pause until one or more conditions are satisfied.
task := workflow.NewDurableTask(
    "wait-for-approval",
    func(ctx hatchet.DurableContext, input ApprovalInput) (*ApprovalOutput, error) {
        // Pause for up to 24 hours waiting for an approval event
        event, err := ctx.WaitForEvent("order:approved", "")
        if err != nil {
            return nil, err
        }

        var data map[string]interface{}
        if err := hatchet.EventInto(event, &data); err != nil {
            return nil, err
        }

        return &ApprovalOutput{Approved: true}, nil
    },
)

Conditions

Conditions control when a task executes. Use them with WithWaitFor() and WithSkipIf().
// Wait 10 seconds before running
hatchet.WithWaitFor(hatchet.SleepCondition(10 * time.Second))

// Wait for a user event
hatchet.WithWaitFor(hatchet.UserEventCondition("order:approved", ""))

// Wait until a parent output satisfies a CEL expression
hatchet.WithWaitFor(hatchet.ParentCondition(step1, "output.status == 'ready'"))

// Any of the conditions must be met (OR)
hatchet.WithWaitFor(hatchet.OrCondition(
    hatchet.SleepCondition(10 * time.Second),
    hatchet.UserEventCondition("order:approved", ""),
))

// All conditions must be met (AND)
hatchet.WithWaitFor(hatchet.AndCondition(
    hatchet.ParentCondition(step1, "output.ready == true"),
    hatchet.UserEventCondition("payment:confirmed", ""),
))

Triggering runs

workflow.Run(ctx, input, opts...)

Runs a workflow and blocks until it completes. Returns a *WorkflowResult.
result, err := workflow.Run(context.Background(), Input{Value: 5})
if err != nil {
    log.Fatal(err)
}

// Extract a specific task's output
var step1Out StepOutput
if err := result.TaskOutput("step-1").Into(&step1Out); err != nil {
    log.Fatal(err)
}

workflow.RunNoWait(ctx, input, opts...)

Enqueues a workflow run and immediately returns a *WorkflowRunRef without blocking.
ref, err := workflow.RunNoWait(context.Background(), Input{Value: 5})
if err != nil {
    log.Fatal(err)
}

fmt.Println(ref.RunId)

// Later, wait for the result
result, err := ref.Result()

workflow.RunMany(ctx, inputs)

Runs multiple workflow instances concurrently with different inputs. Returns []WorkflowRunRef.
refs, err := workflow.RunMany(context.Background(), []hatchet.RunManyOpt{
    {Input: Input{Value: 1}},
    {Input: Input{Value: 2}},
})

Run options

WithRunMetadata(metadata map[string]string)
RunOptFunc
Attaches arbitrary key-value metadata to the run, visible in the Hatchet dashboard.
WithRunPriority(priority RunPriority)
RunOptFunc
Sets the run priority. Constants: RunPriorityLow (1), RunPriorityMedium (2), RunPriorityHigh (3).
WithRunSticky(sticky bool)
RunOptFunc
When true and called from within a parent task, routes the run to the same worker.
WithRunKey(key string)
RunOptFunc
A deduplication key for child workflow runs spawned from within a parent task.
WithDesiredWorkerLabels(labels map[string]*DesiredWorkerLabel)
RunOptFunc
Routes the run to workers that match the specified labels.

Extracting results

result, err := task.Run(ctx, input)
if err != nil {
    log.Fatal(err)
}

// Unmarshal directly from a standalone task result
var output MyOutput
if err := result.Into(&output); err != nil {
    log.Fatal(err)
}

// For workflow results, extract by task name
taskResult := workflowResult.TaskOutput("my-task")
var taskOutput MyTaskOutput
if err := taskResult.Into(&taskOutput); err != nil {
    log.Fatal(err)
}

Event triggers

Workflows and standalone tasks can be triggered automatically when events are pushed. Pass WithWorkflowEvents() as a workflow option or hatchet.WithWorkflowEvents() as a StandaloneTaskOption:
events.go
package main

import (
    "context"
    "log"
    "time"

    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

type EventInput struct {
    UserID  string `json:"user_id"`
    Action  string `json:"action"`
}

type ProcessOutput struct {
    ProcessedAt string `json:"processed_at"`
    UserID      string `json:"user_id"`
}

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatal(err)
    }

    task := client.NewStandaloneTask(
        "process-user-event",
        func(ctx hatchet.Context, input EventInput) (ProcessOutput, error) {
            return ProcessOutput{
                ProcessedAt: time.Now().Format(time.RFC3339),
                UserID:      input.UserID,
            }, nil
        },
        hatchet.WithWorkflowEvents("user:created", "user:updated"),
    )

    _ = task

    // Push an event programmatically
    err = client.Events().Push(context.Background(), "user:created", EventInput{
        UserID: "user-123",
        Action: "created",
    })
    if err != nil {
        log.Fatal(err)
    }
}

Cron triggers

Declare recurring cron schedules directly on a workflow or standalone task:
// On a workflow
workflow := client.NewWorkflow("nightly-report",
    hatchet.WithWorkflowCron("0 0 * * *"),
    hatchet.WithWorkflowCronInput(map[string]interface{}{"full": true}),
)

// On a standalone task
task := client.NewStandaloneTask(
    "send-digest",
    myFn,
    hatchet.WithCron("0 9 * * 1-5"), // weekdays at 9 AM
)

Failure handlers

Call workflow.OnFailure() or task.OnFailure() to register a function that runs when any task in the workflow fails:
workflow.OnFailure(func(ctx hatchet.Context, input Input) (*Output, error) {
    errors := ctx.StepRunErrors()
    log.Printf("Workflow failed: %v", errors)
    return nil, nil
})

Spawning child workflows

From within a task, use ctx.SpawnWorkflow() or call .Run() on another workflow or standalone task passing the hatchet.Context directly:
parent := workflow.NewTask("parent-task", func(ctx hatchet.Context, input Input) (*Output, error) {
    // Pass the hatchet.Context to run a child workflow and wait for its result
    result, err := childTask.Run(ctx, ChildInput{Value: input.Value})
    if err != nil {
        return nil, err
    }

    var childOutput ChildOutput
    if err := result.Into(&childOutput); err != nil {
        return nil, err
    }

    return &Output{Result: childOutput.Result}, nil
})

Build docs developers (and LLMs) love