Worker provides worker pool management and background task execution from @effect/platform.
Overview
import { Worker } from "@effect/platform"
interface Worker<I, O, E = never> {
readonly id: number
readonly execute: (message: I) => Stream<O, E | WorkerError>
readonly executeEffect: (message: I) => Effect<O, E | WorkerError>
}
interface WorkerPool<I, O, E = never> {
readonly backing: Pool<Worker<I, O, E>, WorkerError>
readonly broadcast: (message: I) => Effect<void, E | WorkerError>
readonly execute: (message: I) => Stream<O, E | WorkerError>
readonly executeEffect: (message: I) => Effect<O, E | WorkerError>
}
Worker Manager
makeManager
Create a worker manager.const makeManager: Effect<WorkerManager, never, PlatformWorker>
layerManager
Worker manager as a layer.const layerManager: Layer<WorkerManager, never, PlatformWorker>
Creating Workers
Spawner
Provide a worker spawner function.interface SpawnerFn<W = unknown> {
(id: number): W
}
const Spawner: Context.Tag<Spawner, SpawnerFn<unknown>>
layerSpawner
const layerSpawner: <W = unknown>(
spawner: SpawnerFn<W>
) => Layer<Spawner, never, never>
import { Worker } from "@effect/platform"
import { Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"
const SpawnerLive = Worker.layerSpawner((id) =>
new NodeWorker("./worker.js", { workerData: { id } })
)
Worker Pools
makePool
Create a worker pool.const makePool: <I, O, E>(
options: WorkerPool.Options<I>
) => Effect<WorkerPool<I, O, E>, WorkerError, WorkerManager | Spawner | Scope>
type WorkerPool.Options<I> =
& Worker.Options<I>
& ({
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect<void, WorkerError>
readonly size: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
} | {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect<void, WorkerError>
readonly minSize: number
readonly maxSize: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
readonly timeToLive: Duration.DurationInput
})
import { Worker } from "@effect/platform"
import { Effect } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* Worker.makePool({
size: 4,
encode: (message) => Effect.succeed(message)
})
const result = yield* pool.executeEffect({ task: "process" })
})
)
makePoolLayer
Create pool as a layer.const makePoolLayer: <Tag, I, O, E>(
tag: Context.Tag<Tag, WorkerPool<I, O, E>>,
options: WorkerPool.Options<I>
) => Layer<Tag, WorkerError, WorkerManager | Spawner>
import { Worker, Context } from "effect"
interface MyTask {
readonly type: string
readonly data: unknown
}
interface MyWorkerPool extends WorkerPool<MyTask, string, never> {}
const MyWorkerPool = Context.GenericTag<MyWorkerPool>("MyWorkerPool")
const MyWorkerPoolLive = Worker.makePoolLayer(MyWorkerPool, {
size: 4
})
Pool Operations
execute
Execute task in pool (streaming).const pool: WorkerPool<Input, Output, Error>
pool.execute(message) // Stream<Output, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect, Stream } from "effect"
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({ size: 4 })
const results = pool.execute({ task: "process", data: [1, 2, 3] })
yield* Stream.runForEach(results, (result) =>
Effect.sync(() => console.log(result))
)
})
executeEffect
Execute task in pool (single result).pool.executeEffect(message) // Effect<Output, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect } from "effect"
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({ size: 4 })
const result = yield* pool.executeEffect({
operation: "sum",
numbers: [1, 2, 3, 4, 5]
})
console.log(result) // 15
})
broadcast
Broadcast message to all workers.pool.broadcast(message) // Effect<void, Error | WorkerError>
import { Worker } from "@effect/platform"
import { Effect } from "effect"
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({ size: 4 })
// Send configuration to all workers
yield* pool.broadcast({
type: "config",
settings: { debug: true }
})
})
Serialized Workers
Workers with schema-based serialization.makeSerialized
Create serialized worker.const makeSerialized: <I extends Schema.TaggedRequest.All>(
options: SerializedWorker.Options<I>
) => Effect<SerializedWorker<I>, WorkerError, WorkerManager | Spawner | Scope>
makePoolSerialized
Create serialized worker pool.const makePoolSerialized: <I extends Schema.TaggedRequest.All>(
options: SerializedWorkerPool.Options<I>
) => Effect<SerializedWorkerPool<I>, WorkerError, WorkerManager | Spawner | Scope>
import { Worker, Schema } from "effect"
import { Effect } from "effect"
class ProcessTask extends Schema.TaggedRequest<ProcessTask>()("ProcessTask", {
failure: Schema.String,
success: Schema.Number,
payload: {
numbers: Schema.Array(Schema.Number)
}
}) {}
class ConfigTask extends Schema.TaggedRequest<ConfigTask>()("ConfigTask", {
failure: Schema.String,
success: Schema.Void,
payload: {
settings: Schema.Struct({
debug: Schema.Boolean
})
}
}) {}
type WorkerRequest = ProcessTask | ConfigTask
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* Worker.makePoolSerialized<WorkerRequest>({
size: 4
})
const result = yield* pool.executeEffect(
new ProcessTask({ numbers: [1, 2, 3, 4, 5] })
)
console.log(result) // 15
})
)
makePoolSerializedLayer
Serialized pool as a layer.const makePoolSerializedLayer: <Tag, I extends Schema.TaggedRequest.All>(
tag: Context.Tag<Tag, SerializedWorkerPool<I>>,
options: SerializedWorkerPool.Options<I>
) => Layer<Tag, WorkerError, WorkerManager | Spawner>
Worker Options
Worker.Options
interface Worker.Options<I> {
readonly encode?: ((message: I) => Effect<unknown, WorkerError>) | undefined
readonly initialMessage?: LazyArg<I> | undefined
}
import { Worker } from "@effect/platform"
import { Effect } from "effect"
const pool = Worker.makePool({
size: 4,
encode: (msg) => Effect.succeed(JSON.stringify(msg)),
initialMessage: () => ({ type: "init", timestamp: Date.now() })
})
onCreate
Callback when worker is created.const pool = Worker.makePool({
size: 4,
onCreate: (worker) =>
Effect.sync(() => console.log(`Worker ${worker.id} created`))
})
Pool Sizing
Fixed Size
const pool = Worker.makePool({
size: 4,
concurrency: 10 // Tasks per worker
})
Dynamic Size
import { Worker } from "@effect/platform"
const pool = Worker.makePool({
minSize: 2,
maxSize: 8,
timeToLive: "60 seconds",
targetUtilization: 0.7 // Scale up at 70% utilization
})
Platform Workers
makePlatform
Create platform-specific worker implementation.const makePlatform: <W>() => <
P extends { readonly postMessage: (message: any, transfers?: any) => void }
>(options: {
readonly setup: (options: {
readonly worker: W
readonly scope: Scope
}) => Effect<P, WorkerError>
readonly listen: (options: {
readonly port: P
readonly emit: (data: any) => void
readonly deferred: Deferred<never, WorkerError>
readonly scope: Scope
}) => Effect<void>
}) => PlatformWorker
Examples
Basic Worker Pool
import { Worker } from "@effect/platform"
import { Effect, Layer } from "effect"
import { Worker as NodeWorker } from "node:worker_threads"
const SpawnerLive = Worker.layerSpawner((id) =>
new NodeWorker("./worker.js", { workerData: { id } })
)
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({
size: 4
})
const results = yield* pool.executeEffect({
operation: "process",
data: [1, 2, 3]
})
console.log(results)
}).pipe(
Effect.provide(Worker.layerManager),
Effect.provide(SpawnerLive)
)
Streaming Results
import { Worker } from "@effect/platform"
import { Effect, Stream } from "effect"
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({ size: 4 })
const stream = pool.execute({ task: "generate", count: 100 })
yield* Stream.runForEach(stream, (item) =>
Effect.sync(() => console.log(item))
)
})
Dynamic Pool Scaling
import { Worker } from "@effect/platform"
import { Effect } from "effect"
const program = Effect.gen(function* () {
const pool = yield* Worker.makePool({
minSize: 1,
maxSize: 10,
timeToLive: "30 seconds",
targetUtilization: 0.8,
onCreate: (worker) =>
Effect.sync(() => console.log(`Spawned worker ${worker.id}`))
})
// Pool automatically scales based on load
for (let i = 0; i < 100; i++) {
yield* pool.executeEffect({ task: "process", id: i })
}
})
Schema-Based Workers
import { Worker, Schema, Context, Layer } from "effect"
import { Effect } from "effect"
class SumRequest extends Schema.TaggedRequest<SumRequest>()("Sum", {
failure: Schema.String,
success: Schema.Number,
payload: {
numbers: Schema.Array(Schema.Number)
}
}) {}
type Requests = SumRequest
interface MyWorkerPool extends Worker.SerializedWorkerPool<Requests> {}
const MyWorkerPool = Context.GenericTag<MyWorkerPool>("MyWorkerPool")
const MyWorkerPoolLive = Worker.makePoolSerializedLayer(MyWorkerPool, {
size: 4
})
const program = Effect.gen(function* () {
const pool = yield* MyWorkerPool
const result = yield* pool.executeEffect(
new SumRequest({ numbers: [1, 2, 3, 4, 5] })
)
console.log(result) // 15
}).pipe(
Effect.provide(MyWorkerPoolLive)
)