Skip to main content
Learn to create custom effect types and operators that encapsulate domain-specific logic.

Custom Effect Constructors

Wrapping External APIs

Create Effect-based wrappers for callback-based APIs:
import { Effect } from "effect"

interface FileSystem {
  readFile: (path: string, callback: (error: Error | null, data: string | null) => void) => void
}

const readFile = (fs: FileSystem, path: string): Effect.Effect<string, Error> =>
  Effect.async<string, Error>((resume) => {
    fs.readFile(path, (error, data) => {
      if (error) {
        resume(Effect.fail(error))
      } else if (data !== null) {
        resume(Effect.succeed(data))
      } else {
        resume(Effect.fail(new Error("No data received")))
      }
    })
  })

With Cleanup

import { Effect } from "effect"

interface Subscription {
  unsubscribe: () => void
}

const subscribe = (topic: string): Effect.Effect<string, Error> =>
  Effect.async<string, Error>((resume) => {
    const subscription = messageBus.subscribe(topic, (message) => {
      resume(Effect.succeed(message))
    })
    
    // Return cleanup function
    return Effect.sync(() => subscription.unsubscribe())
  })

Effect.asyncEffect

When the registration itself is effectful:
import { Effect } from "effect"

const watchFile = (path: string): Effect.Effect<string, Error> =>
  Effect.asyncEffect<string, Error>((resume) =>
    Effect.gen(function*() {
      // Effectful setup
      const watcher = yield* createWatcher(path)
      
      watcher.on("change", (content) => {
        resume(Effect.succeed(content))
      })
      
      watcher.on("error", (error) => {
        resume(Effect.fail(error))
      })
      
      // Return cleanup
      return Effect.sync(() => watcher.close())
    })
  )

Custom Operators

Retry with Backoff

import { Effect, Schedule } from "effect"

const retryWithBackoff = <A, E, R>(
  self: Effect.Effect<A, E, R>,
  options: {
    times: number
    initialDelay: string
    factor: number
  }
): Effect.Effect<A, E, R> => {
  const schedule = Schedule.exponential(options.initialDelay).pipe(
    Schedule.compose(Schedule.recurs(options.times))
  )
  
  return self.pipe(
    Effect.retry(schedule),
    Effect.tapError((error) =>
      Effect.log(`Retry failed: ${error}`)
    )
  )
}

// Usage
const program = fetchData().pipe(
  retryWithBackoff({
    times: 3,
    initialDelay: "100 millis",
    factor: 2
  })
)

Timeout with Fallback

import { Effect, Duration } from "effect"

const timeoutOr = <A, E, R, A2, E2, R2>(
  self: Effect.Effect<A, E, R>,
  duration: Duration.DurationInput,
  fallback: Effect.Effect<A2, E2, R2>
): Effect.Effect<A | A2, E | E2, R | R2> =>
  self.pipe(
    Effect.timeout(duration),
    Effect.flatMap((option) =>
      option._tag === "Some"
        ? Effect.succeed(option.value)
        : fallback
    )
  )

// Usage
const program = slowOperation().pipe(
  timeoutOr("5 seconds", Effect.succeed("default value"))
)

Memoization

import { Effect, Ref } from "effect"

const memoize = <A, E, R>(
  effect: Effect.Effect<A, E, R>
): Effect.Effect<Effect.Effect<A, E>, never, R> =>
  Effect.gen(function*() {
    const cache = yield* Ref.make<
      { _tag: "Empty" } | { _tag: "Pending"; deferred: Deferred.Deferred<A, E> } | { _tag: "Done"; value: Exit.Exit<A, E> }
    >({ _tag: "Empty" })
    
    return Effect.gen(function*() {
      const current = yield* Ref.get(cache)
      
      switch (current._tag) {
        case "Done":
          return yield* current.value
        
        case "Pending":
          return yield* Deferred.await(current.deferred)
        
        case "Empty": {
          const deferred = yield* Deferred.make<A, E>()
          
          yield* Ref.set(cache, { _tag: "Pending", deferred })
          
          const exit = yield* Effect.exit(effect)
          
          yield* Ref.set(cache, { _tag: "Done", value: exit })
          yield* Deferred.complete(deferred, exit)
          
          return yield* exit
        }
      }
    })
  })

// Usage
const program = Effect.gen(function*() {
  const memoized = yield* memoize(expensiveComputation())
  
  // First call computes
  const result1 = yield* memoized
  
  // Second call uses cached value
  const result2 = yield* memoized
})

Custom Effect Types

Resource Pool

import { Context, Effect, Queue, Ref, Scope } from "effect"

class ResourcePool<R> extends Context.Tag("ResourcePool")<
  ResourcePool<R>,
  {
    acquire: () => Effect.Effect<R, never, Scope.Scope>
    withResource: <A, E>(f: (resource: R) => Effect.Effect<A, E>) => Effect.Effect<A, E>
  }
>() {}

const makeResourcePool = <R, E>(
  create: () => Effect.Effect<R, E>,
  destroy: (resource: R) => Effect.Effect<void>,
  size: number
): Effect.Effect<ResourcePool<R>, E, Scope.Scope> =>
  Effect.gen(function*() {
    const pool = yield* Queue.bounded<R>(size)
    const count = yield* Ref.make(0)
    
    // Initialize pool
    for (let i = 0; i < size; i++) {
      const resource = yield* create()
      yield* Queue.offer(pool, resource)
      yield* Ref.update(count, (n) => n + 1)
    }
    
    // Cleanup on scope close
    yield* Effect.addFinalizer(() =>
      Effect.gen(function*() {
        yield* Queue.shutdown(pool)
        let resource
        while ((resource = yield* Queue.take(pool).pipe(Effect.timeout("100 millis")))._tag === "Some") {
          yield* destroy(resource.value)
        }
      })
    )
    
    return ResourcePool.of({
      acquire: () =>
        Effect.gen(function*() {
          const resource = yield* Queue.take(pool)
          yield* Effect.addFinalizer(() => Queue.offer(pool, resource))
          return resource
        }),
      
      withResource: <A, E2>(f: (resource: R) => Effect.Effect<A, E2>) =>
        Effect.scoped(
          Effect.gen(function*() {
            const resource = yield* Queue.take(pool)
            const result = yield* f(resource)
            yield* Queue.offer(pool, resource)
            return result
          })
        )
    })
  })

// Usage
const program = Effect.gen(function*() {
  const pool = yield* makeResourcePool(
    () => createDatabaseConnection(),
    (conn) => conn.close(),
    10
  )
  
  const service = yield* pool
  
  // Use resource from pool
  const result = yield* service.withResource((conn) =>
    conn.query("SELECT * FROM users")
  )
})

Circuit Breaker

import { Context, Effect, Ref } from "effect"

type CircuitState =
  | { _tag: "Closed" }
  | { _tag: "Open"; openedAt: number }
  | { _tag: "HalfOpen" }

class CircuitBreaker extends Context.Tag("CircuitBreaker")<
  CircuitBreaker,
  {
    execute: <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E | Error, R>
  }
>() {}

const makeCircuitBreaker = (
  options: {
    maxFailures: number
    resetTimeout: number
  }
): Effect.Effect<CircuitBreaker> =>
  Effect.gen(function*() {
    const state = yield* Ref.make<CircuitState>({ _tag: "Closed" })
    const failures = yield* Ref.make(0)
    
    const execute = <A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E | Error, R> =>
      Effect.gen(function*() {
        const currentState = yield* Ref.get(state)
        
        switch (currentState._tag) {
          case "Open": {
            const now = Date.now()
            if (now - currentState.openedAt > options.resetTimeout) {
              yield* Ref.set(state, { _tag: "HalfOpen" })
              return yield* attemptCall()
            }
            return yield* Effect.fail(new Error("Circuit breaker is OPEN"))
          }
          
          case "HalfOpen":
          case "Closed":
            return yield* attemptCall()
        }
      })
    
    const attemptCall = <A, E, R>() =>
      effect.pipe(
        Effect.tap(() =>
          Effect.gen(function*() {
            yield* Ref.set(failures, 0)
            yield* Ref.set(state, { _tag: "Closed" })
          })
        ),
        Effect.tapError(() =>
          Effect.gen(function*() {
            const count = yield* Ref.updateAndGet(failures, (n) => n + 1)
            
            if (count >= options.maxFailures) {
              yield* Ref.set(state, {
                _tag: "Open",
                openedAt: Date.now()
              })
            }
          })
        )
      )
    
    return CircuitBreaker.of({ execute })
  })

Effect Aspects

Create reusable cross-cutting concerns:
import { Effect, Metric } from "effect"

const tracked = <A, E, R>(
  name: string
) => (effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> => {
  const counter = Metric.counter(`${name}_calls`)
  const duration = Metric.timer(`${name}_duration`)
  const errors = Metric.counter(`${name}_errors`)
  
  return effect.pipe(
    Metric.trackDuration(duration),
    Metric.trackSuccess(counter),
    Metric.trackError(errors),
    Effect.withSpan(name)
  )
}

// Usage
const fetchUser = (id: number) =>
  Effect.succeed({ id, name: "Alice" }).pipe(
    tracked("fetchUser")
  )
When building custom operators, use Effect.gen for complex logic and leverage existing Effect operators as building blocks.
Custom effect types should maintain referential transparency. Avoid side effects outside of Effect constructors.

Build docs developers (and LLMs) love