Skip to main content
Worker provides worker pool management and background task execution from @effect/platform.

Overview

import { Worker } from "@effect/platform"

interface Worker<I, O, E = never> {
  readonly id: number
  readonly execute: (message: I) => Stream<O, E | WorkerError>
  readonly executeEffect: (message: I) => Effect<O, E | WorkerError>
}

interface WorkerPool<I, O, E = never> {
  readonly backing: Pool<Worker<I, O, E>, WorkerError>
  readonly broadcast: (message: I) => Effect<void, E | WorkerError>
  readonly execute: (message: I) => Stream<O, E | WorkerError>
  readonly executeEffect: (message: I) => Effect<O, E | WorkerError>
}

Worker Manager

makeManager

Create a worker manager.
const makeManager: Effect<WorkerManager, never, PlatformWorker>

layerManager

Worker manager as a layer.
const layerManager: Layer<WorkerManager, never, PlatformWorker>

Creating Workers

Spawner

Provide a worker spawner function.
interface SpawnerFn<W = unknown> {
  (id: number): W
}

const Spawner: Context.Tag<Spawner, SpawnerFn<unknown>>

layerSpawner

const layerSpawner: <W = unknown>(
  spawner: SpawnerFn<W>
) => Layer<Spawner, never, never>
Example:
import { Worker } from "@effect/platform"
import { Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"

const SpawnerLive = Worker.layerSpawner((id) =>
  new NodeWorker("./worker.js", { workerData: { id } })
)

Worker Pools

makePool

Create a worker pool.
const makePool: <I, O, E>(
  options: WorkerPool.Options<I>
) => Effect<WorkerPool<I, O, E>, WorkerError, WorkerManager | Spawner | Scope>
Options:
type WorkerPool.Options<I> =
  & Worker.Options<I>
  & ({
    readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect<void, WorkerError>
    readonly size: number
    readonly concurrency?: number | undefined
    readonly targetUtilization?: number | undefined
  } | {
    readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect<void, WorkerError>
    readonly minSize: number
    readonly maxSize: number
    readonly concurrency?: number | undefined
    readonly targetUtilization?: number | undefined
    readonly timeToLive: Duration.DurationInput
  })
Example:
import { Worker } from "@effect/platform"
import { Effect } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    const pool = yield* Worker.makePool({
      size: 4,
      encode: (message) => Effect.succeed(message)
    })

    const result = yield* pool.executeEffect({ task: "process" })
  })
)

makePoolLayer

Create pool as a layer.
const makePoolLayer: <Tag, I, O, E>(
  tag: Context.Tag<Tag, WorkerPool<I, O, E>>,
  options: WorkerPool.Options<I>
) => Layer<Tag, WorkerError, WorkerManager | Spawner>
Example:
import { Worker, Context } from "effect"

interface MyTask {
  readonly type: string
  readonly data: unknown
}

interface MyWorkerPool extends WorkerPool<MyTask, string, never> {}

const MyWorkerPool = Context.GenericTag<MyWorkerPool>("MyWorkerPool")

const MyWorkerPoolLive = Worker.makePoolLayer(MyWorkerPool, {
  size: 4
})

Pool Operations

execute

Execute task in pool (streaming).
const pool: WorkerPool<Input, Output, Error>

pool.execute(message) // Stream<Output, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect, Stream } from "effect"

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({ size: 4 })

  const results = pool.execute({ task: "process", data: [1, 2, 3] })

  yield* Stream.runForEach(results, (result) =>
    Effect.sync(() => console.log(result))
  )
})

executeEffect

Execute task in pool (single result).
pool.executeEffect(message) // Effect<Output, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({ size: 4 })

  const result = yield* pool.executeEffect({
    operation: "sum",
    numbers: [1, 2, 3, 4, 5]
  })

  console.log(result) // 15
})

broadcast

Broadcast message to all workers.
pool.broadcast(message) // Effect<void, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({ size: 4 })

  // Send configuration to all workers
  yield* pool.broadcast({
    type: "config",
    settings: { debug: true }
  })
})

Serialized Workers

Workers with schema-based serialization.

makeSerialized

Create serialized worker.
const makeSerialized: <I extends Schema.TaggedRequest.All>(
  options: SerializedWorker.Options<I>
) => Effect<SerializedWorker<I>, WorkerError, WorkerManager | Spawner | Scope>

makePoolSerialized

Create serialized worker pool.
const makePoolSerialized: <I extends Schema.TaggedRequest.All>(
  options: SerializedWorkerPool.Options<I>
) => Effect<SerializedWorkerPool<I>, WorkerError, WorkerManager | Spawner | Scope>
Example:
import { Worker, Schema } from "effect"
import { Effect } from "effect"

class ProcessTask extends Schema.TaggedRequest<ProcessTask>()("ProcessTask", {
  failure: Schema.String,
  success: Schema.Number,
  payload: {
    numbers: Schema.Array(Schema.Number)
  }
}) {}

class ConfigTask extends Schema.TaggedRequest<ConfigTask>()("ConfigTask", {
  failure: Schema.String,
  success: Schema.Void,
  payload: {
    settings: Schema.Struct({
      debug: Schema.Boolean
    })
  }
}) {}

type WorkerRequest = ProcessTask | ConfigTask

const program = Effect.scoped(
  Effect.gen(function* () {
    const pool = yield* Worker.makePoolSerialized<WorkerRequest>({
      size: 4
    })

    const result = yield* pool.executeEffect(
      new ProcessTask({ numbers: [1, 2, 3, 4, 5] })
    )

    console.log(result) // 15
  })
)

makePoolSerializedLayer

Serialized pool as a layer.
const makePoolSerializedLayer: <Tag, I extends Schema.TaggedRequest.All>(
  tag: Context.Tag<Tag, SerializedWorkerPool<I>>,
  options: SerializedWorkerPool.Options<I>
) => Layer<Tag, WorkerError, WorkerManager | Spawner>

Worker Options

Worker.Options

interface Worker.Options<I> {
  readonly encode?: ((message: I) => Effect<unknown, WorkerError>) | undefined
  readonly initialMessage?: LazyArg<I> | undefined
}
encode: Custom message encoding before sending to worker initialMessage: Initial message sent to worker on spawn
import { Worker } from "@effect/platform"
import { Effect } from "effect"

const pool = Worker.makePool({
  size: 4,
  encode: (msg) => Effect.succeed(JSON.stringify(msg)),
  initialMessage: () => ({ type: "init", timestamp: Date.now() })
})

onCreate

Callback when worker is created.
const pool = Worker.makePool({
  size: 4,
  onCreate: (worker) =>
    Effect.sync(() => console.log(`Worker ${worker.id} created`))
})

Pool Sizing

Fixed Size

const pool = Worker.makePool({
  size: 4,
  concurrency: 10 // Tasks per worker
})

Dynamic Size

import { Worker } from "@effect/platform"

const pool = Worker.makePool({
  minSize: 2,
  maxSize: 8,
  timeToLive: "60 seconds",
  targetUtilization: 0.7 // Scale up at 70% utilization
})

Platform Workers

makePlatform

Create platform-specific worker implementation.
const makePlatform: <W>() => <
  P extends { readonly postMessage: (message: any, transfers?: any) => void }
>(options: {
  readonly setup: (options: {
    readonly worker: W
    readonly scope: Scope
  }) => Effect<P, WorkerError>
  readonly listen: (options: {
    readonly port: P
    readonly emit: (data: any) => void
    readonly deferred: Deferred<never, WorkerError>
    readonly scope: Scope
  }) => Effect<void>
}) => PlatformWorker

Examples

Basic Worker Pool

import { Worker } from "@effect/platform"
import { Effect, Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"

const SpawnerLive = Worker.layerSpawner((id) =>
  new NodeWorker("./worker.js", { workerData: { id } })
)

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({
    size: 4
  })

  const results = yield* pool.executeEffect({
    operation: "process",
    data: [1, 2, 3]
  })

  console.log(results)
}).pipe(
  Effect.provide(Worker.layerManager),
  Effect.provide(SpawnerLive)
)

Streaming Results

import { Worker } from "@effect/platform"
import { Effect, Stream } from "effect"

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({ size: 4 })

  const stream = pool.execute({ task: "generate", count: 100 })

  yield* Stream.runForEach(stream, (item) =>
    Effect.sync(() => console.log(item))
  )
})

Dynamic Pool Scaling

import { Worker } from "@effect/platform"
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const pool = yield* Worker.makePool({
    minSize: 1,
    maxSize: 10,
    timeToLive: "30 seconds",
    targetUtilization: 0.8,
    onCreate: (worker) =>
      Effect.sync(() => console.log(`Spawned worker ${worker.id}`))
  })

  // Pool automatically scales based on load
  for (let i = 0; i < 100; i++) {
    yield* pool.executeEffect({ task: "process", id: i })
  }
})

Schema-Based Workers

import { Worker, Schema, Context, Layer } from "effect"
import { Effect } from "effect"

class SumRequest extends Schema.TaggedRequest<SumRequest>()("Sum", {
  failure: Schema.String,
  success: Schema.Number,
  payload: {
    numbers: Schema.Array(Schema.Number)
  }
}) {}

type Requests = SumRequest

interface MyWorkerPool extends Worker.SerializedWorkerPool<Requests> {}
const MyWorkerPool = Context.GenericTag<MyWorkerPool>("MyWorkerPool")

const MyWorkerPoolLive = Worker.makePoolSerializedLayer(MyWorkerPool, {
  size: 4
})

const program = Effect.gen(function* () {
  const pool = yield* MyWorkerPool

  const result = yield* pool.executeEffect(
    new SumRequest({ numbers: [1, 2, 3, 4, 5] })
  )

  console.log(result) // 15
}).pipe(
  Effect.provide(MyWorkerPoolLive)
)

Build docs developers (and LLMs) love