Fiber - Lightweight Concurrency
- Are extremely cheap to create (millions can exist)
- Are automatically interrupted when their parent completes
- Compose with Effect’s error handling and resource management
fork - Concurrent Execution
import { Effect } from "effect"
const child = Effect.gen(function* () {
yield* Effect.sleep("10 seconds")
console.log("Child completed")
})
const parent = Effect.gen(function* () {
yield* Effect.fork(child)
yield* Effect.sleep("1 second")
console.log("Parent completing")
})
// When parent completes, child is automatically interrupted
Effect.runPromise(parent)
// Output:
// Parent completing
// (child never logs because it's interrupted)
import { Effect, Schedule } from "effect"
const daemon = Effect.repeat(
Effect.log("daemon: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.forkDaemon(daemon)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
Effect.runFork(parent)
// Output:
// parent: started!
// daemon: still running!
// daemon: still running!
// daemon: still running!
// parent: finished!
// daemon: still running!
// daemon: still running!
// ...continues forever
race - First to Complete
import { Effect } from "effect"
const task1 = Effect.succeed("task1").pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log("task1 done")),
Effect.onInterrupt(() => Effect.log("task1 interrupted"))
)
const task2 = Effect.succeed("task2").pipe(
Effect.delay("200 millis"),
Effect.tap(() => Effect.log("task2 done")),
Effect.onInterrupt(() => Effect.log("task2 interrupted"))
)
Effect.runPromise(Effect.race(task1, task2))
// Output:
// task1 done
// task2 interrupted
import { Effect } from "effect"
const task1 = Effect.succeed("task1").pipe(Effect.delay("100 millis"))
const task2 = Effect.succeed("task2").pipe(Effect.delay("200 millis"))
const task3 = Effect.succeed("task3").pipe(Effect.delay("150 millis"))
const program = Effect.raceAll([task1, task2, task3])
Effect.runPromise(program).then(console.log)
// Output: task1
import { Effect } from "effect"
const failing = Effect.fail("error").pipe(
Effect.delay("100 millis")
)
const succeeding = Effect.succeed("success").pipe(
Effect.delay("200 millis")
)
Effect.runPromise(Effect.race(failing, succeeding))
// Rejects with "error" after 100ms
all - Concurrent Combinations
import { Effect } from "effect"
const task1 = Effect.succeed(1).pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log("task1"))
)
const task2 = Effect.succeed(2).pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log("task2"))
)
const program = Effect.all([task1, task2])
Effect.runPromise(program).then(console.log)
// Output (200ms total):
// task1
// task2
// [1, 2]
import { Effect } from "effect"
const task1 = Effect.succeed(1).pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log("task1"))
)
const task2 = Effect.succeed(2).pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log("task2"))
)
const program = Effect.all([task1, task2], { concurrency: "unbounded" })
Effect.runPromise(program).then(console.log)
// Output (100ms total - tasks run in parallel):
// task1
// task2
// [1, 2]
import { Effect } from "effect"
const tasks = Array.from({ length: 10 }, (_, i) =>
Effect.succeed(i).pipe(
Effect.delay("100 millis"),
Effect.tap(() => Effect.log(`Task ${i}`))
)
)
// Run at most 3 tasks concurrently
const program = Effect.all(tasks, { concurrency: 3 })
import { Effect } from "effect"
const program = Effect.all({
user: fetchUser(123),
posts: fetchPosts(123),
comments: fetchComments(123)
}, { concurrency: "unbounded" })
// Effect<{ user: User, posts: Post[], comments: Comment[] }>
const result = await Effect.runPromise(program)
console.log(result.user, result.posts, result.comments)
import { Effect } from "effect"
const program = Effect.all([
Effect.succeed(1),
Effect.fail("error"),
Effect.succeed(3)
], { concurrency: "unbounded" })
// Fails with "error", task 3 is interrupted
import { Effect, Either } from "effect"
const program = Effect.all([
Effect.succeed(1),
Effect.fail("error"),
Effect.succeed(3)
], { mode: "either", concurrency: "unbounded" })
const results = await Effect.runPromise(program)
// [
// { _tag: 'Right', right: 1 },
// { _tag: 'Left', left: 'error' },
// { _tag: 'Right', right: 3 }
// ]
Interruption
Fibers can be interrupted to cancel ongoing work:Interruption is cooperative. Effects can register cleanup logic using
onInterrupt or ensuring.Best Practices
import { Effect } from "effect"
// ✅ Good: Fiber is child of parent scope
const good = Effect.gen(function* () {
const fiber = yield* Effect.fork(task)
return yield* Fiber.join(fiber)
})
// Fiber is interrupted if parent is interrupted
// ❌ Bad: Daemon outlives its scope
const bad = Effect.gen(function* () {
yield* Effect.forkDaemon(task)
// Daemon keeps running even if this effect completes
})
import { Effect } from "effect"
// ❌ Bad: Can overwhelm resources
const bad = Effect.all(
largeDataset.map(processItem),
{ concurrency: "unbounded" }
)
// ✅ Good: Controlled concurrency
const good = Effect.all(
largeDataset.map(processItem),
{ concurrency: 10 }
)
import { Effect } from "effect"
const withCleanup = Effect.gen(function* () {
const resource = yield* acquireResource()
yield* Effect.addFinalizer(() => releaseResource(resource))
return yield* useResource(resource)
})
// Finalizer runs even if interrupted