Skip to main content
The Worker module provides abstractions for spawning and communicating with worker threads in a type-safe, platform-agnostic way. It supports bidirectional communication with automatic message handling and error propagation.

Overview

Worker features:
  • Type-safe message passing between main and worker threads
  • Bidirectional communication (send and receive)
  • Automatic error handling and propagation
  • Scope-based lifecycle management
  • Platform-agnostic abstractions
  • Support for transferable objects

Basic Concepts

The Worker system consists of:
  • Spawner: Factory for creating worker instances
  • Worker: Handle for communicating with a worker thread
  • WorkerPlatform: Platform-specific worker implementation
  • Message Protocol: Type-safe message passing

Creating Workers

Define a Spawner

First, define how workers should be spawned:
import { Worker, Layer } from "effect"

// For Node.js
import { Worker as NodeWorker } from "worker_threads"

const SpawnerLive = Worker.layerSpawner(
  (id: number) => new NodeWorker("./worker.js", { workerData: { id } })
)

Spawn a Worker

import { Effect, Worker } from "effect"

const program = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  
  // Spawn worker with ID 0
  const worker = yield* platform.spawn(0)
  
  // Use worker...
})

Message Passing

Sending Messages

Send messages from the main thread to the worker:
import { Effect, Worker } from "effect"

interface WorkerInput {
  readonly task: string
  readonly data: unknown
}

const program = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn<unknown, WorkerInput>(0)
  
  // Send a message
  yield* worker.send({
    task: "process",
    data: { value: 42 }
  })
  
  // Send with transferables (for ArrayBuffer, MessagePort, etc.)
  const buffer = new ArrayBuffer(1024)
  yield* worker.send(
    { task: "process-buffer", buffer },
    [buffer] // Transferred, not copied
  )
})

Receiving Messages

Handle incoming messages in the worker using run:
import { Console, Effect, Worker } from "effect"

interface WorkerOutput {
  readonly type: "result" | "progress"
  readonly data: unknown
}

const program = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn<WorkerOutput, unknown>(0)
  
  // Run message handler
  yield* worker.run((message) =>
    Effect.gen(function*() {
      if (message.type === "result") {
        yield* Console.log("Got result:", message.data)
      } else if (message.type === "progress") {
        yield* Console.log("Progress:", message.data)
      }
    })
  )
})

Worker Implementation

Inside the worker file, use WorkerRunner to handle messages:
// worker.ts
import { Console, Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"

interface WorkerInput {
  readonly task: string
  readonly data: unknown
}

interface WorkerOutput {
  readonly type: "result" | "progress"
  readonly data: unknown
}

// Define message handler
const handler = (message: WorkerInput) =>
  Effect.gen(function*() {
    yield* Console.log(`Processing task: ${message.task}`)
    
    // Send progress update
    yield* WorkerRunner.send<WorkerOutput>({
      type: "progress",
      data: { percent: 50 }
    })
    
    // Process the task
    const result = yield* processTask(message)
    
    // Send result
    yield* WorkerRunner.send<WorkerOutput>({
      type: "result",
      data: result
    })
  })

// Run the worker
WorkerRunner.run(handler)

Bidirectional Communication

Workers support full bidirectional message passing:
// Main thread
import { Effect, Worker } from "effect"

const mainProgram = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn<WorkerToMain, MainToWorker>(0)
  
  // Start message handler in background
  yield* Effect.forkDaemon(
    worker.run((message) =>
      Effect.gen(function*() {
        yield* Console.log("Worker says:", message)
        // Respond to worker
        yield* worker.send({ response: "acknowledged" })
      })
    )
  )
  
  // Send initial message
  yield* worker.send({ command: "start" })
})
// Worker thread
import { Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"

const handler = (message: MainToWorker) =>
  Effect.gen(function*() {
    if (message.command === "start") {
      // Send message to main
      yield* WorkerRunner.send({ status: "started" })
      
      // Wait for response (handled by main's worker.run)
    }
  })

WorkerRunner.run(handler)

Spawn Callbacks

Execute code when the worker spawns:
import { Console, Effect, Worker } from "effect"

const program = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn(0)
  
  yield* worker.run(
    (message) => processMessage(message),
    {
      onSpawn: Console.log("Worker spawned and ready!")
    }
  )
})

Error Handling

Errors in workers are propagated as WorkerError:
import { Effect, Worker } from "effect"

const program = Effect.gen(function*() {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn(0)
  
  yield* worker.send({ task: "dangerous" }).pipe(
    Effect.catchTag("WorkerError", (error) =>
      Effect.gen(function*() {
        if (error.reason._tag === "WorkerSendError") {
          yield* Effect.log("Failed to send message:", error.reason.message)
        }
      })
    )
  )
})

WorkerError Types

  • WorkerSendError: Failed to send message to worker
  • Other platform-specific errors

Complete Example

Main Thread

// main.ts
import { Console, Effect, Layer } from "effect"
import { Worker as NodeWorker } from "worker_threads"
import { Worker, WorkerPlatform } from "effect/unstable/workers"

interface TaskInput {
  readonly task: "compute" | "process"
  readonly value: number
}

interface TaskOutput {
  readonly type: "progress" | "result"
  readonly value: number
}

// Create spawner layer
const SpawnerLive = Worker.layerSpawner(
  (id: number) => new NodeWorker("./worker.js", { workerData: { id } })
)

// Create platform layer
const PlatformLive = Layer.effect(
  WorkerPlatform,
  Effect.map(
    Worker.Spawner,
    (spawner) => Worker.makePlatform(/* ... platform-specific setup ... */)
  )
).pipe(Layer.provide(SpawnerLive))

// Main program
const program = Effect.gen(function*() {
  const platform = yield* WorkerPlatform
  const worker = yield* platform.spawn<TaskOutput, TaskInput>(0)
  
  // Handle worker messages
  yield* Effect.forkDaemon(
    worker.run((message) =>
      Effect.gen(function*() {
        if (message.type === "progress") {
          yield* Console.log(`Progress: ${message.value}%`)
        } else if (message.type === "result") {
          yield* Console.log(`Result: ${message.value}`)
        }
      })
    )
  )
  
  // Send tasks to worker
  yield* worker.send({ task: "compute", value: 42 })
  yield* worker.send({ task: "process", value: 100 })
  
  // Wait for completion
  yield* Effect.sleep("5 seconds")
}).pipe(Effect.provide(PlatformLive))

Effect.runPromise(program)

Worker Thread

// worker.ts
import { Console, Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"

interface TaskInput {
  readonly task: "compute" | "process"
  readonly value: number
}

interface TaskOutput {
  readonly type: "progress" | "result"
  readonly value: number
}

const handler = (message: TaskInput) =>
  Effect.gen(function*() {
    yield* Console.log(`Worker received task: ${message.task}`)
    
    // Report progress
    yield* WorkerRunner.send<TaskOutput>({
      type: "progress",
      value: 50
    })
    
    // Simulate work
    yield* Effect.sleep("1 second")
    
    // Calculate result
    const result = message.task === "compute" 
      ? message.value * 2 
      : message.value + 10
    
    // Send result
    yield* WorkerRunner.send<TaskOutput>({
      type: "result",
      value: result
    })
  })

// Start the worker
WorkerRunner.run(handler)

Platform Integration

Node.js Worker Threads

import { Worker as NodeWorker } from "worker_threads"
import { Effect, Layer, Worker } from "effect"
import { NodeWorkerPlatform } from "@effect/platform-node"

const SpawnerLive = Worker.layerSpawner(
  (id: number) => new NodeWorker("./worker.js", {
    workerData: { id },
    env: process.env
  })
)

const PlatformLive = NodeWorkerPlatform.layer.pipe(
  Layer.provide(SpawnerLive)
)

Web Workers (Browser)

import { Effect, Layer, Worker } from "effect"
import { BrowserWorkerPlatform } from "@effect/platform-browser"

const SpawnerLive = Worker.layerSpawner(
  (id: number) => new Worker("./worker.js", { name: `worker-${id}` })
)

const PlatformLive = BrowserWorkerPlatform.layer.pipe(
  Layer.provide(SpawnerLive)
)

Advanced Patterns

Worker Pool

Create a pool of workers for parallel processing:
import { Array, Effect, Worker } from "effect"

const createWorkerPool = (size: number) =>
  Effect.gen(function*() {
    const platform = yield* Worker.WorkerPlatform
    
    // Spawn multiple workers
    const workers = yield* Effect.all(
      Array.range(0, size - 1).map((id) => platform.spawn(id))
    )
    
    return workers
  })

const program = Effect.gen(function*() {
  const pool = yield* createWorkerPool(4)
  
  // Distribute work across pool
  const tasks = Array.range(0, 100)
  
  yield* Effect.all(
    tasks.map((task, index) => {
      const worker = pool[index % pool.length]
      return worker.send({ task })
    }),
    { concurrency: "unbounded" }
  )
})

Worker with Scope

Manage worker lifecycle with scopes:
import { Effect, Scope, Worker } from "effect"

const program = Effect.scoped(
  Effect.gen(function*() {
    const platform = yield* Worker.WorkerPlatform
    const worker = yield* platform.spawn(0)
    
    // Worker automatically cleaned up when scope exits
    yield* worker.send({ task: "process" })
    
    // Process results...
  })
)

Type Reference

Worker

interface Worker<O = unknown, I = unknown> {
  readonly send: (
    message: I,
    transfers?: ReadonlyArray<unknown>
  ) => Effect.Effect<void, WorkerError>
  
  readonly run: <A, E, R>(
    handler: (message: O) => Effect.Effect<A, E, R>,
    options?: {
      readonly onSpawn?: Effect.Effect<void> | undefined
    }
  ) => Effect.Effect<never, E | WorkerError, R>
}

WorkerPlatform

class WorkerPlatform extends Service<WorkerPlatform, {
  readonly spawn: <O = unknown, I = unknown>(
    id: number
  ) => Effect.Effect<Worker<O, I>, WorkerError, Spawner>
}> {}

Spawner

interface Spawner {
  readonly _: unique symbol
}

interface SpawnerFn<W = unknown> {
  (id: number): W
}

Message Types

type PlatformMessage = 
  | readonly [ready: 0] 
  | readonly [data: 1, unknown]

See Also

  • Effect - Core Effect operations
  • Fiber - Lightweight concurrency
  • Scope - Resource management
  • Layer - Dependency injection

Build docs developers (and LLMs) love