Creating a worker
Call client.NewWorker(name, options...) to create a worker and register your workflows with it.
package main
import (
"log"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatal(err)
}
// Declare your workflows before creating the worker
task := client.NewStandaloneTask("my-task", myFn)
worker, err := client.NewWorker("my-worker",
hatchet.WithWorkflows(task),
hatchet.WithSlots(100),
)
if err != nil {
log.Fatal(err)
}
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Fatal(err)
}
}
Signature
func (c *Client) NewWorker(name string, options ...WorkerOption) (*Worker, error)
Worker options
WithWorkflows(workflows ...WorkflowBase)
Registers workflows and standalone tasks with the worker. Both *Workflow and *StandaloneTask implement WorkflowBase.hatchet.WithWorkflows(workflowA, workflowB, standaloneTask)
WithSlots(slots int)
WorkerOption
default:"100"
Maximum number of concurrent standard task runs this worker will accept. Defaults to 100 if not set.
WithDurableSlots(durableSlots int)
WorkerOption
default:"1000"
Maximum number of concurrent durable task runs this worker will accept. Defaults to 1000 if not set.
WithLabels(labels map[string]any)
Key-value labels attached to this worker for affinity-based task routing. Values can be strings or numbers.hatchet.WithLabels(map[string]any{
"model": "gpu-xl",
"memory": 512,
})
WithLogger(logger *zerolog.Logger)
Sets a custom zerolog.Logger for the worker’s internal log output.
WithPanicHandler(fn func(ctx hatchet.Context, recovered any))
Sets a custom panic handler. Called whenever a task function panics, with the value returned by recover() passed as recovered.hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
log.Printf("task panicked: %v", recovered)
})
Starting the worker
worker.StartBlocking(ctx context.Context) error
Connects to Hatchet, begins processing tasks, and blocks until ctx is cancelled. This is the most common way to run a worker in a main() function.
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Fatalf("worker stopped: %v", err)
}
cmdutils.NewInterruptContext() returns a context that is cancelled when the process receives SIGINT or SIGTERM, enabling graceful shutdown.
worker.Start() (func() error, error)
Non-blocking variant. Connects to Hatchet and starts processing tasks in the background. Returns a cleanup function to call when you want to stop the worker.
cleanup, err := worker.Start()
if err != nil {
log.Fatal(err)
}
// ... do other work ...
// Stop the worker and wait for in-flight tasks to complete
if err := cleanup(); err != nil {
log.Printf("cleanup error: %v", err)
}
This is useful in tests or when you need to trigger workflows programmatically before blocking:
package main
import (
"context"
"log"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatal(err)
}
workflow := client.NewWorkflow("dag-workflow")
// ... define tasks ...
worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow))
if err != nil {
log.Fatal(err)
}
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
// Start non-blocking so we can also trigger a run
go func() {
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Fatalf("worker error: %v", err)
}
}()
// Trigger a run after the worker is started
_, err = workflow.Run(context.Background(), myInput)
if err != nil {
log.Fatal(err)
}
<-interruptCtx.Done()
}
Worker lifecycle and graceful shutdown
When ctx is cancelled (or when the cleanup function from Start() is called), the worker:
- Stops accepting new tasks.
- Waits for all in-flight task executions to complete.
- Disconnects from the Hatchet server.
This means you can safely use SIGINT / SIGTERM to stop a worker without losing work that is already in progress.
Avoid killing a worker process with SIGKILL. In-flight tasks will be marked as failed and retried according to their retry configuration.
Middleware
Attach middleware functions that wrap every task execution on this worker using worker.Use():
worker.Use(func(ctx hatchet.Context, next func(hatchet.Context) error) error {
log.Printf("task starting: %s", ctx.StepName())
err := next(ctx)
log.Printf("task finished: %s err=%v", ctx.StepName(), err)
return err
})
Multiple middleware functions are executed in registration order.
Panic handling
By default, panics in task functions crash the worker process. Use WithPanicHandler to recover and log panics gracefully:
worker, err := client.NewWorker("my-worker",
hatchet.WithWorkflows(task),
hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
// Send to your error tracking service, e.g. Sentry
log.Printf("panic in task %s: %v", ctx.StepName(), recovered)
}),
)
The task run will be marked as failed and retried according to the task’s retry configuration.
Full example
package main
import (
"log"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type Input struct {
Message string `json:"message"`
}
type Output struct {
Result string `json:"result"`
}
func processMessage(ctx hatchet.Context, input Input) (Output, error) {
ctx.Log("processing: " + input.Message)
return Output{Result: "done: " + input.Message}, nil
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
task := client.NewStandaloneTask("process-message", processMessage)
worker, err := client.NewWorker(
"example-worker",
hatchet.WithWorkflows(task),
hatchet.WithSlots(50),
hatchet.WithPanicHandler(func(ctx hatchet.Context, recovered any) {
log.Printf("panic: %v", recovered)
}),
)
if err != nil {
log.Fatalf("failed to create worker: %v", err)
}
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Fatalf("worker stopped: %v", err)
}
}