Skip to main content
A semaphore is a synchronization mechanism used to manage access to a shared resource. In Effect, semaphores help control resource access or coordinate tasks within asynchronous, concurrent operations. A semaphore acts as a generalized mutex, allowing a set number of permits to be held and released concurrently. Permits act like tickets, giving tasks or fibers controlled access to a shared resource. When no permits are available, tasks trying to acquire one will wait until a permit is released.

Type

interface Semaphore {
  resize(permits: number): Effect<void>
  withPermits(permits: number): <A, E, R>(self: Effect<A, E, R>) => Effect<A, E, R>
  withPermitsIfAvailable(permits: number): <A, E, R>(self: Effect<A, E, R>) => Effect<Option.Option<A>, E, R>
  take(permits: number): Effect<number>
  release(permits: number): Effect<number>
  releaseAll: Effect<number>
}

Creating Semaphores

makeSemaphore

Creates a new semaphore with the specified number of permits.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(3)
  // Can be used to limit concurrent access
})

unsafeMakeSemaphore

Unsafely creates a new semaphore.
const unsafeMakeSemaphore: (permits: number) => Semaphore

Using Permits

withPermits

Runs an effect with the given number of permits and releases them when the effect completes.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(2)

  const task = (id: number) =>
    semaphore.withPermits(1)(
      Effect.gen(function* () {
        console.log(`Task ${id} started`)
        yield* Effect.sleep("1 second")
        console.log(`Task ${id} completed`)
      })
    )

  // Only 2 tasks can run concurrently
  yield* Effect.all([task(1), task(2), task(3), task(4)], {
    concurrency: "unbounded"
  })
})

withPermitsIfAvailable

Runs an effect only if the specified number of permits are immediately available.
import { Effect, Option } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(1)

  // Acquire the permit
  yield* semaphore.take(1)

  // Try to run task - will return None since no permits available
  const result = yield* semaphore.withPermitsIfAvailable(1)(
    Effect.succeed("task result")
  )

  console.log(Option.isNone(result)) // true

  // Release the permit
  yield* semaphore.release(1)

  // Now it will succeed
  const result2 = yield* semaphore.withPermitsIfAvailable(1)(
    Effect.succeed("task result")
  )

  console.log(result2) // Option.some("task result")
})

Manual Permit Control

take

Acquires the specified number of permits, suspending if they are not available.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(5)

  const available = yield* semaphore.take(3)
  console.log(available) // 2 (5 - 3)
})

release

Releases the specified number of permits.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(5)

  yield* semaphore.take(3)
  const available = yield* semaphore.release(3)
  console.log(available) // 5
})

releaseAll

Releases all permits held by the semaphore.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(10)

  yield* semaphore.take(7)
  const available = yield* semaphore.releaseAll
  console.log(available) // 10
})

resize

Adjusts the number of permits available in the semaphore.
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(5)

  // Increase capacity
  yield* semaphore.resize(10)

  // Decrease capacity
  yield* semaphore.resize(3)
})

Rate Limiting Example

import { Effect } from "effect"

const makeRateLimiter = (requestsPerSecond: number) =>
  Effect.gen(function* () {
    const semaphore = yield* Effect.makeSemaphore(requestsPerSecond)

    // Refill permits every second
    yield* Effect.fork(
      Effect.forever(
        Effect.gen(function* () {
          yield* Effect.sleep("1 second")
          yield* semaphore.releaseAll
        })
      )
    )

    return {
      execute: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
        semaphore.withPermits(1)(effect)
    }
  })

const program = Effect.gen(function* () {
  const limiter = yield* makeRateLimiter(5) // 5 requests per second

  const apiCall = (id: number) =>
    limiter.execute(
      Effect.gen(function* () {
        console.log(`API call ${id} at ${Date.now()}`)
        yield* Effect.sleep("100 millis")
        return `Result ${id}`
      })
    )

  // Make 10 API calls - only 5 per second will execute
  const results = yield* Effect.all(
    Array.from({ length: 10 }, (_, i) => apiCall(i + 1)),
    { concurrency: "unbounded" }
  )

  console.log(results)
})

Resource Pool Example

import { Effect } from "effect"

interface Database {
  query: (sql: string) => Effect.Effect<string>
}

const makeDatabasePool = (maxConnections: number) =>
  Effect.gen(function* () {
    const semaphore = yield* Effect.makeSemaphore(maxConnections)

    const connection: Database = {
      query: (sql: string) =>
        Effect.gen(function* () {
          yield* Effect.sleep("200 millis")
          return `Result for: ${sql}`
        })
    }

    return {
      withConnection: <A, E, R>(
        f: (db: Database) => Effect.Effect<A, E, R>
      ) => semaphore.withPermits(1)(f(connection))
    }
  })

const program = Effect.gen(function* () {
  const pool = yield* makeDatabasePool(3) // Max 3 concurrent connections

  const query = (id: number) =>
    pool.withConnection(db =>
      Effect.gen(function* () {
        console.log(`Query ${id} started`)
        const result = yield* db.query(`SELECT * FROM users WHERE id = ${id}`)
        console.log(`Query ${id} completed`)
        return result
      })
    )

  // Run 10 queries - only 3 will execute concurrently
  const results = yield* Effect.all(
    Array.from({ length: 10 }, (_, i) => query(i + 1)),
    { concurrency: "unbounded" }
  )

  console.log(results)
})

Batch Processing Example

import { Effect, Chunk } from "effect"

const processBatch = <A, E, R>(
  items: Chunk.Chunk<A>,
  maxConcurrent: number,
  process: (item: A) => Effect.Effect<void, E, R>
) =>
  Effect.gen(function* () {
    const semaphore = yield* Effect.makeSemaphore(maxConcurrent)

    yield* Effect.forEach(
      items,
      item => semaphore.withPermits(1)(process(item)),
      { concurrency: "unbounded" }
    )
  })

const program = Effect.gen(function* () {
  const items = Chunk.range(1, 20)

  yield* processBatch(items, 5, item =>
    Effect.gen(function* () {
      console.log(`Processing item ${item}`)
      yield* Effect.sleep("500 millis")
      console.log(`Completed item ${item}`)
    })
  )
})

Download Manager Example

import { Effect } from "effect"

const makeDownloadManager = (maxConcurrent: number) =>
  Effect.gen(function* () {
    const semaphore = yield* Effect.makeSemaphore(maxConcurrent)

    return {
      download: (url: string) =>
        semaphore.withPermits(1)(
          Effect.gen(function* () {
            console.log(`Downloading ${url}...`)
            yield* Effect.sleep("1 second")
            console.log(`Downloaded ${url}`)
            return `Content from ${url}`
          })
        )
    }
  })

const program = Effect.gen(function* () {
  const manager = yield* makeDownloadManager(3)

  const urls = [
    "https://example.com/file1.pdf",
    "https://example.com/file2.pdf",
    "https://example.com/file3.pdf",
    "https://example.com/file4.pdf",
    "https://example.com/file5.pdf"
  ]

  const results = yield* Effect.all(
    urls.map(url => manager.download(url)),
    { concurrency: "unbounded" }
  )

  console.log(`Downloaded ${results.length} files`)
})

Build docs developers (and LLMs) love