Custom Effect Constructors
Wrapping External APIs
Create Effect-based wrappers for callback-based APIs:import { Effect } from "effect"
interface FileSystem {
readFile: (path: string, callback: (error: Error | null, data: string | null) => void) => void
}
const readFile = (fs: FileSystem, path: string): Effect.Effect<string, Error> =>
Effect.async<string, Error>((resume) => {
fs.readFile(path, (error, data) => {
if (error) {
resume(Effect.fail(error))
} else if (data !== null) {
resume(Effect.succeed(data))
} else {
resume(Effect.fail(new Error("No data received")))
}
})
})
With Cleanup
import { Effect } from "effect"
interface Subscription {
unsubscribe: () => void
}
const subscribe = (topic: string): Effect.Effect<string, Error> =>
Effect.async<string, Error>((resume) => {
const subscription = messageBus.subscribe(topic, (message) => {
resume(Effect.succeed(message))
})
// Return cleanup function
return Effect.sync(() => subscription.unsubscribe())
})
Effect.asyncEffect
When the registration itself is effectful:import { Effect } from "effect"
const watchFile = (path: string): Effect.Effect<string, Error> =>
Effect.asyncEffect<string, Error>((resume) =>
Effect.gen(function*() {
// Effectful setup
const watcher = yield* createWatcher(path)
watcher.on("change", (content) => {
resume(Effect.succeed(content))
})
watcher.on("error", (error) => {
resume(Effect.fail(error))
})
// Return cleanup
return Effect.sync(() => watcher.close())
})
)
Custom Operators
Retry with Backoff
import { Effect, Schedule } from "effect"
const retryWithBackoff = <A, E, R>(
self: Effect.Effect<A, E, R>,
options: {
times: number
initialDelay: string
factor: number
}
): Effect.Effect<A, E, R> => {
const schedule = Schedule.exponential(options.initialDelay).pipe(
Schedule.compose(Schedule.recurs(options.times))
)
return self.pipe(
Effect.retry(schedule),
Effect.tapError((error) =>
Effect.log(`Retry failed: ${error}`)
)
)
}
// Usage
const program = fetchData().pipe(
retryWithBackoff({
times: 3,
initialDelay: "100 millis",
factor: 2
})
)
Timeout with Fallback
import { Effect, Duration } from "effect"
const timeoutOr = <A, E, R, A2, E2, R2>(
self: Effect.Effect<A, E, R>,
duration: Duration.DurationInput,
fallback: Effect.Effect<A2, E2, R2>
): Effect.Effect<A | A2, E | E2, R | R2> =>
self.pipe(
Effect.timeout(duration),
Effect.flatMap((option) =>
option._tag === "Some"
? Effect.succeed(option.value)
: fallback
)
)
// Usage
const program = slowOperation().pipe(
timeoutOr("5 seconds", Effect.succeed("default value"))
)
Memoization
import { Effect, Ref } from "effect"
const memoize = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<Effect.Effect<A, E>, never, R> =>
Effect.gen(function*() {
const cache = yield* Ref.make<
{ _tag: "Empty" } | { _tag: "Pending"; deferred: Deferred.Deferred<A, E> } | { _tag: "Done"; value: Exit.Exit<A, E> }
>({ _tag: "Empty" })
return Effect.gen(function*() {
const current = yield* Ref.get(cache)
switch (current._tag) {
case "Done":
return yield* current.value
case "Pending":
return yield* Deferred.await(current.deferred)
case "Empty": {
const deferred = yield* Deferred.make<A, E>()
yield* Ref.set(cache, { _tag: "Pending", deferred })
const exit = yield* Effect.exit(effect)
yield* Ref.set(cache, { _tag: "Done", value: exit })
yield* Deferred.complete(deferred, exit)
return yield* exit
}
}
})
})
// Usage
const program = Effect.gen(function*() {
const memoized = yield* memoize(expensiveComputation())
// First call computes
const result1 = yield* memoized
// Second call uses cached value
const result2 = yield* memoized
})
Custom Effect Types
Resource Pool
import { Context, Effect, Queue, Ref, Scope } from "effect"
class ResourcePool<R> extends Context.Tag("ResourcePool")<
ResourcePool<R>,
{
acquire: () => Effect.Effect<R, never, Scope.Scope>
withResource: <A, E>(f: (resource: R) => Effect.Effect<A, E>) => Effect.Effect<A, E>
}
>() {}
const makeResourcePool = <R, E>(
create: () => Effect.Effect<R, E>,
destroy: (resource: R) => Effect.Effect<void>,
size: number
): Effect.Effect<ResourcePool<R>, E, Scope.Scope> =>
Effect.gen(function*() {
const pool = yield* Queue.bounded<R>(size)
const count = yield* Ref.make(0)
// Initialize pool
for (let i = 0; i < size; i++) {
const resource = yield* create()
yield* Queue.offer(pool, resource)
yield* Ref.update(count, (n) => n + 1)
}
// Cleanup on scope close
yield* Effect.addFinalizer(() =>
Effect.gen(function*() {
yield* Queue.shutdown(pool)
let resource
while ((resource = yield* Queue.take(pool).pipe(Effect.timeout("100 millis")))._tag === "Some") {
yield* destroy(resource.value)
}
})
)
return ResourcePool.of({
acquire: () =>
Effect.gen(function*() {
const resource = yield* Queue.take(pool)
yield* Effect.addFinalizer(() => Queue.offer(pool, resource))
return resource
}),
withResource: <A, E2>(f: (resource: R) => Effect.Effect<A, E2>) =>
Effect.scoped(
Effect.gen(function*() {
const resource = yield* Queue.take(pool)
const result = yield* f(resource)
yield* Queue.offer(pool, resource)
return result
})
)
})
})
// Usage
const program = Effect.gen(function*() {
const pool = yield* makeResourcePool(
() => createDatabaseConnection(),
(conn) => conn.close(),
10
)
const service = yield* pool
// Use resource from pool
const result = yield* service.withResource((conn) =>
conn.query("SELECT * FROM users")
)
})
Circuit Breaker
import { Context, Effect, Ref } from "effect"
type CircuitState =
| { _tag: "Closed" }
| { _tag: "Open"; openedAt: number }
| { _tag: "HalfOpen" }
class CircuitBreaker extends Context.Tag("CircuitBreaker")<
CircuitBreaker,
{
execute: <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E | Error, R>
}
>() {}
const makeCircuitBreaker = (
options: {
maxFailures: number
resetTimeout: number
}
): Effect.Effect<CircuitBreaker> =>
Effect.gen(function*() {
const state = yield* Ref.make<CircuitState>({ _tag: "Closed" })
const failures = yield* Ref.make(0)
const execute = <A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E | Error, R> =>
Effect.gen(function*() {
const currentState = yield* Ref.get(state)
switch (currentState._tag) {
case "Open": {
const now = Date.now()
if (now - currentState.openedAt > options.resetTimeout) {
yield* Ref.set(state, { _tag: "HalfOpen" })
return yield* attemptCall()
}
return yield* Effect.fail(new Error("Circuit breaker is OPEN"))
}
case "HalfOpen":
case "Closed":
return yield* attemptCall()
}
})
const attemptCall = <A, E, R>() =>
effect.pipe(
Effect.tap(() =>
Effect.gen(function*() {
yield* Ref.set(failures, 0)
yield* Ref.set(state, { _tag: "Closed" })
})
),
Effect.tapError(() =>
Effect.gen(function*() {
const count = yield* Ref.updateAndGet(failures, (n) => n + 1)
if (count >= options.maxFailures) {
yield* Ref.set(state, {
_tag: "Open",
openedAt: Date.now()
})
}
})
)
)
return CircuitBreaker.of({ execute })
})
Effect Aspects
Create reusable cross-cutting concerns:import { Effect, Metric } from "effect"
const tracked = <A, E, R>(
name: string
) => (effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> => {
const counter = Metric.counter(`${name}_calls`)
const duration = Metric.timer(`${name}_duration`)
const errors = Metric.counter(`${name}_errors`)
return effect.pipe(
Metric.trackDuration(duration),
Metric.trackSuccess(counter),
Metric.trackError(errors),
Effect.withSpan(name)
)
}
// Usage
const fetchUser = (id: number) =>
Effect.succeed({ id, name: "Alice" }).pipe(
tracked("fetchUser")
)
When building custom operators, use
Effect.gen for complex logic and leverage existing Effect operators as building blocks.Custom effect types should maintain referential transparency. Avoid side effects outside of Effect constructors.