Skip to main content

Overview

The effect/unstable/workers modules provide tools for working with web workers:
  • Spawn workers from Effect programs
  • Type-safe message passing
  • Bidirectional communication
  • Worker lifecycle management

Defining a worker

Create a worker platform for your runtime:
import { Effect } from "effect"
import { WorkerPlatform, Worker } from "effect/unstable/workers"

// Node.js worker
import { Worker as NodeWorker } from "node:worker_threads"

const NodeWorkerPlatform = WorkerPlatform.of({
  spawn: (id: number) => Effect.gen(function*() {
    const spawner = yield* Worker.Spawner
    const worker = spawner(id) as NodeWorker
    
    return Worker.makeUnsafe({
      send: (message, transfers) =>
        Effect.sync(() => worker.postMessage(message, transfers as any)),
      
      run: (handler) => Effect.gen(function*() {
        worker.on("message", (msg) => {
          Effect.runFork(handler(msg))
        })
        
        worker.on("error", (error) => {
          Effect.runFork(Effect.fail(new WorkerError({ cause: error })))
        })
      })
    })
  })
})

Spawning workers

Create and use workers:
import { Effect, Layer } from "effect"
import { Worker, WorkerPlatform } from "effect/unstable/workers"

const program = Effect.gen(function*() {
  const platform = yield* WorkerPlatform.WorkerPlatform
  const worker = yield* platform.spawn(0)
  
  // Send message to worker
  yield* worker.send({ type: "process", data: [1, 2, 3] })
  
  // Receive messages from worker
  yield* worker.run((message) => {
    console.log("Received from worker:", message)
  })
})

Worker spawner

Provide a worker spawner function:
import { Worker } from "effect/unstable/workers"
import { Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"

const WorkerSpawner = Worker.layerSpawner<NodeWorker>((id) => {
  return new NodeWorker("./worker.js", {
    workerData: { id }
  })
})

Bidirectional communication

Send and receive messages:
const program = Effect.gen(function*() {
  const platform = yield* WorkerPlatform.WorkerPlatform
  const worker = yield* platform.spawn(0)
  
  // Send work to the worker
  yield* worker.send({
    type: "compute",
    numbers: [1, 2, 3, 4, 5]
  })
  
  // Process results from the worker
  yield* worker.run((message) => Effect.gen(function*() {
    if (message.type === "result") {
      yield* Effect.log(`Result: ${message.value}`)
    }
  }))
})

Worker lifecycle

Manage worker spawning and cleanup:
const program = Effect.scoped(
  Effect.gen(function*() {
    const platform = yield* WorkerPlatform.WorkerPlatform
    
    // Worker is automatically cleaned up when scope closes
    const worker = yield* platform.spawn(0)
    
    yield* worker.send({ type: "task", data: "process this" })
    
    // Work with the worker...
  })
)

Handling worker errors

import { WorkerError } from "effect/unstable/workers"

const program = Effect.gen(function*() {
  const worker = yield* platform.spawn(0)
  
  yield* worker.send({ type: "task" }).pipe(
    Effect.catchTag("WorkerError", (error) =>
      Effect.log(`Worker error: ${error.reason.message}`)
    )
  )
})

Transferable objects

Transfer ArrayBuffers and other transferables:
const buffer = new ArrayBuffer(1024)

yield* worker.send(
  { type: "process", buffer },
  [buffer]  // Transfer ownership
)

Worker pool

Create a pool of workers:
import { Effect, Array } from "effect"

const createWorkerPool = (size: number) =>
  Effect.gen(function*() {
    const platform = yield* WorkerPlatform.WorkerPlatform
    
    const workers = yield* Effect.all(
      Array.makeBy(size, (id) => platform.spawn(id))
    )
    
    return {
      workers,
      submit: (task: unknown) => {
        const worker = workers[Math.floor(Math.random() * workers.length)]
        return worker.send(task)
      }
    }
  })

const program = Effect.scoped(
  Effect.gen(function*() {
    const pool = yield* createWorkerPool(4)
    
    yield* pool.submit({ type: "compute", data: [1, 2, 3] })
  })
)

Complete example

import { Effect, Layer, Stream } from "effect"
import { Worker, WorkerPlatform } from "effect/unstable/workers"
import { Worker as NodeWorker } from "node:worker_threads"

// Worker spawner
const WorkerSpawner = Worker.layerSpawner<NodeWorker>((id) => {
  return new NodeWorker("./compute-worker.js", {
    workerData: { workerId: id }
  })
})

// Node.js worker platform
const NodeWorkerPlatform = Layer.effect(
  WorkerPlatform,
  Effect.gen(function*() {
    return WorkerPlatform.of({
      spawn: (id: number) => Effect.gen(function*() {
        const spawner = yield* Worker.Spawner
        const nodeWorker = spawner(id) as NodeWorker
        
        return Worker.makeUnsafe({
          send: (message, transfers) =>
            Effect.sync(() => {
              nodeWorker.postMessage(message, transfers as any)
            }),
          
          run: (handler) => Effect.async<never, WorkerError>((resume) => {
            nodeWorker.on("message", (msg) => {
              Effect.runFork(handler(msg))
            })
            
            nodeWorker.on("error", (error) => {
              resume(Effect.fail(new WorkerError({
                reason: new WorkerSendError({
                  message: "Worker error",
                  cause: error
                })
              })))
            })
          })
        })
      })
    })
  })
).pipe(
  Layer.provide(WorkerSpawner)
)

// Use workers
const program = Effect.scoped(
  Effect.gen(function*() {
    const platform = yield* WorkerPlatform.WorkerPlatform
    const worker = yield* platform.spawn(0)
    
    // Send computation task
    yield* worker.send({
      type: "compute",
      operation: "fibonacci",
      n: 40
    })
    
    // Wait for result
    yield* worker.run(
      (message) => Effect.gen(function*() {
        if (message.type === "result") {
          yield* Effect.log(`Fibonacci result: ${message.value}`)
        }
      }),
      {
        onSpawn: Effect.log("Worker spawned and ready")
      }
    )
  })
).pipe(
  Effect.provide(NodeWorkerPlatform)
)

Browser workers

// Browser worker platform
const BrowserWorkerPlatform = Layer.effect(
  WorkerPlatform,
  Effect.succeed(
    WorkerPlatform.of({
      spawn: (id: number) => Effect.gen(function*() {
        const worker = new Worker("/worker.js")
        
        return Worker.makeUnsafe({
          send: (message, transfers) =>
            Effect.sync(() => worker.postMessage(message, transfers)),
          
          run: (handler) => Effect.async(() => {
            worker.onmessage = (event) => {
              Effect.runFork(handler(event.data))
            }
          })
        })
      })
    })
  )
)

See also

  • Socket - WebSocket communication
  • RPC - Type-safe RPC calls

Build docs developers (and LLMs) love