Skip to main content

Creating a worker

Call client.NewWorker(name, options...) to create a worker and register your workflows with it.
worker.go
package main

import (
    "log"

    "github.com/hatchet-dev/hatchet/pkg/cmdutils"
    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatal(err)
    }

    // Declare your workflows before creating the worker
    task := client.NewStandaloneTask("my-task", myFn)

    worker, err := client.NewWorker("my-worker",
        hatchet.WithWorkflows(task),
        hatchet.WithSlots(100),
    )
    if err != nil {
        log.Fatal(err)
    }

    interruptCtx, cancel := cmdutils.NewInterruptContext()
    defer cancel()

    if err := worker.StartBlocking(interruptCtx); err != nil {
        log.Fatal(err)
    }
}

Signature

func (c *Client) NewWorker(name string, options ...WorkerOption) (*Worker, error)

Worker options

WithWorkflows(workflows ...WorkflowBase)
WorkerOption
Registers workflows and standalone tasks with the worker. Both *Workflow and *StandaloneTask implement WorkflowBase.
hatchet.WithWorkflows(workflowA, workflowB, standaloneTask)
WithSlots(slots int)
WorkerOption
default:"100"
Maximum number of concurrent standard task runs this worker will accept. Defaults to 100 if not set.
WithDurableSlots(durableSlots int)
WorkerOption
default:"1000"
Maximum number of concurrent durable task runs this worker will accept. Defaults to 1000 if not set.
WithLabels(labels map[string]any)
WorkerOption
Key-value labels attached to this worker for affinity-based task routing. Values can be strings or numbers.
hatchet.WithLabels(map[string]any{
    "model": "gpu-xl",
    "memory": 512,
})
WithLogger(logger *zerolog.Logger)
WorkerOption
Sets a custom zerolog.Logger for the worker’s internal log output.
WithPanicHandler(fn func(ctx hatchet.Context, recovered any))
WorkerOption
Sets a custom panic handler. Called whenever a task function panics, with the value returned by recover() passed as recovered.
hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
    log.Printf("task panicked: %v", recovered)
})

Starting the worker

worker.StartBlocking(ctx context.Context) error

Connects to Hatchet, begins processing tasks, and blocks until ctx is cancelled. This is the most common way to run a worker in a main() function.
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()

if err := worker.StartBlocking(interruptCtx); err != nil {
    log.Fatalf("worker stopped: %v", err)
}
cmdutils.NewInterruptContext() returns a context that is cancelled when the process receives SIGINT or SIGTERM, enabling graceful shutdown.

worker.Start() (func() error, error)

Non-blocking variant. Connects to Hatchet and starts processing tasks in the background. Returns a cleanup function to call when you want to stop the worker.
cleanup, err := worker.Start()
if err != nil {
    log.Fatal(err)
}

// ... do other work ...

// Stop the worker and wait for in-flight tasks to complete
if err := cleanup(); err != nil {
    log.Printf("cleanup error: %v", err)
}
This is useful in tests or when you need to trigger workflows programmatically before blocking:
package main

import (
    "context"
    "log"

    "github.com/hatchet-dev/hatchet/pkg/cmdutils"
    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatal(err)
    }

    workflow := client.NewWorkflow("dag-workflow")
    // ... define tasks ...

    worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow))
    if err != nil {
        log.Fatal(err)
    }

    interruptCtx, cancel := cmdutils.NewInterruptContext()
    defer cancel()

    // Start non-blocking so we can also trigger a run
    go func() {
        if err := worker.StartBlocking(interruptCtx); err != nil {
            log.Fatalf("worker error: %v", err)
        }
    }()

    // Trigger a run after the worker is started
    _, err = workflow.Run(context.Background(), myInput)
    if err != nil {
        log.Fatal(err)
    }

    <-interruptCtx.Done()
}

Worker lifecycle and graceful shutdown

When ctx is cancelled (or when the cleanup function from Start() is called), the worker:
  1. Stops accepting new tasks.
  2. Waits for all in-flight task executions to complete.
  3. Disconnects from the Hatchet server.
This means you can safely use SIGINT / SIGTERM to stop a worker without losing work that is already in progress.
Avoid killing a worker process with SIGKILL. In-flight tasks will be marked as failed and retried according to their retry configuration.

Middleware

Attach middleware functions that wrap every task execution on this worker using worker.Use():
worker.Use(func(ctx hatchet.Context, next func(hatchet.Context) error) error {
    log.Printf("task starting: %s", ctx.StepName())
    err := next(ctx)
    log.Printf("task finished: %s err=%v", ctx.StepName(), err)
    return err
})
Multiple middleware functions are executed in registration order.

Panic handling

By default, panics in task functions crash the worker process. Use WithPanicHandler to recover and log panics gracefully:
worker, err := client.NewWorker("my-worker",
    hatchet.WithWorkflows(task),
    hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
        // Send to your error tracking service, e.g. Sentry
        log.Printf("panic in task %s: %v", ctx.StepName(), recovered)
    }),
)
The task run will be marked as failed and retried according to the task’s retry configuration.

Full example

worker.go
package main

import (
    "log"

    "github.com/hatchet-dev/hatchet/pkg/cmdutils"
    hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)

type Input struct {
    Message string `json:"message"`
}

type Output struct {
    Result string `json:"result"`
}

func processMessage(ctx hatchet.Context, input Input) (Output, error) {
    ctx.Log("processing: " + input.Message)
    return Output{Result: "done: " + input.Message}, nil
}

func main() {
    client, err := hatchet.NewClient()
    if err != nil {
        log.Fatalf("failed to create client: %v", err)
    }

    task := client.NewStandaloneTask("process-message", processMessage)

    worker, err := client.NewWorker(
        "example-worker",
        hatchet.WithWorkflows(task),
        hatchet.WithSlots(50),
        hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
            log.Printf("panic: %v", recovered)
        }),
    )
    if err != nil {
        log.Fatalf("failed to create worker: %v", err)
    }

    interruptCtx, cancel := cmdutils.NewInterruptContext()
    defer cancel()

    if err := worker.StartBlocking(interruptCtx); err != nil {
        log.Fatalf("worker stopped: %v", err)
    }
}

Build docs developers (and LLMs) love