Overview
Theeffect/unstable/workers modules provide tools for working with web workers:
- Spawn workers from Effect programs
- Type-safe message passing
- Bidirectional communication
- Worker lifecycle management
Defining a worker
Create a worker platform for your runtime:import { Effect } from "effect"
import { WorkerPlatform, Worker } from "effect/unstable/workers"
// Node.js worker
import { Worker as NodeWorker } from "node:worker_threads"
const NodeWorkerPlatform = WorkerPlatform.of({
spawn: (id: number) => Effect.gen(function*() {
const spawner = yield* Worker.Spawner
const worker = spawner(id) as NodeWorker
return Worker.makeUnsafe({
send: (message, transfers) =>
Effect.sync(() => worker.postMessage(message, transfers as any)),
run: (handler) => Effect.gen(function*() {
worker.on("message", (msg) => {
Effect.runFork(handler(msg))
})
worker.on("error", (error) => {
Effect.runFork(Effect.fail(new WorkerError({ cause: error })))
})
})
})
})
})
Spawning workers
Create and use workers:import { Effect, Layer } from "effect"
import { Worker, WorkerPlatform } from "effect/unstable/workers"
const program = Effect.gen(function*() {
const platform = yield* WorkerPlatform.WorkerPlatform
const worker = yield* platform.spawn(0)
// Send message to worker
yield* worker.send({ type: "process", data: [1, 2, 3] })
// Receive messages from worker
yield* worker.run((message) => {
console.log("Received from worker:", message)
})
})
Worker spawner
Provide a worker spawner function:import { Worker } from "effect/unstable/workers"
import { Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"
const WorkerSpawner = Worker.layerSpawner<NodeWorker>((id) => {
return new NodeWorker("./worker.js", {
workerData: { id }
})
})
Bidirectional communication
Send and receive messages:const program = Effect.gen(function*() {
const platform = yield* WorkerPlatform.WorkerPlatform
const worker = yield* platform.spawn(0)
// Send work to the worker
yield* worker.send({
type: "compute",
numbers: [1, 2, 3, 4, 5]
})
// Process results from the worker
yield* worker.run((message) => Effect.gen(function*() {
if (message.type === "result") {
yield* Effect.log(`Result: ${message.value}`)
}
}))
})
Worker lifecycle
Manage worker spawning and cleanup:const program = Effect.scoped(
Effect.gen(function*() {
const platform = yield* WorkerPlatform.WorkerPlatform
// Worker is automatically cleaned up when scope closes
const worker = yield* platform.spawn(0)
yield* worker.send({ type: "task", data: "process this" })
// Work with the worker...
})
)
Handling worker errors
import { WorkerError } from "effect/unstable/workers"
const program = Effect.gen(function*() {
const worker = yield* platform.spawn(0)
yield* worker.send({ type: "task" }).pipe(
Effect.catchTag("WorkerError", (error) =>
Effect.log(`Worker error: ${error.reason.message}`)
)
)
})
Transferable objects
Transfer ArrayBuffers and other transferables:const buffer = new ArrayBuffer(1024)
yield* worker.send(
{ type: "process", buffer },
[buffer] // Transfer ownership
)
Worker pool
Create a pool of workers:import { Effect, Array } from "effect"
const createWorkerPool = (size: number) =>
Effect.gen(function*() {
const platform = yield* WorkerPlatform.WorkerPlatform
const workers = yield* Effect.all(
Array.makeBy(size, (id) => platform.spawn(id))
)
return {
workers,
submit: (task: unknown) => {
const worker = workers[Math.floor(Math.random() * workers.length)]
return worker.send(task)
}
}
})
const program = Effect.scoped(
Effect.gen(function*() {
const pool = yield* createWorkerPool(4)
yield* pool.submit({ type: "compute", data: [1, 2, 3] })
})
)
Complete example
import { Effect, Layer, Stream } from "effect"
import { Worker, WorkerPlatform } from "effect/unstable/workers"
import { Worker as NodeWorker } from "node:worker_threads"
// Worker spawner
const WorkerSpawner = Worker.layerSpawner<NodeWorker>((id) => {
return new NodeWorker("./compute-worker.js", {
workerData: { workerId: id }
})
})
// Node.js worker platform
const NodeWorkerPlatform = Layer.effect(
WorkerPlatform,
Effect.gen(function*() {
return WorkerPlatform.of({
spawn: (id: number) => Effect.gen(function*() {
const spawner = yield* Worker.Spawner
const nodeWorker = spawner(id) as NodeWorker
return Worker.makeUnsafe({
send: (message, transfers) =>
Effect.sync(() => {
nodeWorker.postMessage(message, transfers as any)
}),
run: (handler) => Effect.async<never, WorkerError>((resume) => {
nodeWorker.on("message", (msg) => {
Effect.runFork(handler(msg))
})
nodeWorker.on("error", (error) => {
resume(Effect.fail(new WorkerError({
reason: new WorkerSendError({
message: "Worker error",
cause: error
})
})))
})
})
})
})
})
})
).pipe(
Layer.provide(WorkerSpawner)
)
// Use workers
const program = Effect.scoped(
Effect.gen(function*() {
const platform = yield* WorkerPlatform.WorkerPlatform
const worker = yield* platform.spawn(0)
// Send computation task
yield* worker.send({
type: "compute",
operation: "fibonacci",
n: 40
})
// Wait for result
yield* worker.run(
(message) => Effect.gen(function*() {
if (message.type === "result") {
yield* Effect.log(`Fibonacci result: ${message.value}`)
}
}),
{
onSpawn: Effect.log("Worker spawned and ready")
}
)
})
).pipe(
Effect.provide(NodeWorkerPlatform)
)
Browser workers
// Browser worker platform
const BrowserWorkerPlatform = Layer.effect(
WorkerPlatform,
Effect.succeed(
WorkerPlatform.of({
spawn: (id: number) => Effect.gen(function*() {
const worker = new Worker("/worker.js")
return Worker.makeUnsafe({
send: (message, transfers) =>
Effect.sync(() => worker.postMessage(message, transfers)),
run: (handler) => Effect.async(() => {
worker.onmessage = (event) => {
Effect.runFork(handler(event.data))
}
})
})
})
})
)
)