Skip to main content
The Cluster module is marked as unstable, meaning its APIs may change in minor version releases. Use caution when upgrading Effect versions.

Overview

The effect/unstable/cluster module provides primitives for building distributed applications with Effect. It implements sharding, entity management, message passing, and distributed coordination patterns for scalable, fault-tolerant systems.

Installation

npm install effect

Key Modules

Sharding

The core service for distributed entity management and message routing.
import { Effect } from "effect"
import { Sharding, Entity, Message } from "effect/unstable/cluster"

// Define an entity
const UserEntity = Entity.make("User", {
  // Entity state and behavior
  GetProfile: Message.make<{ userId: string }, { name: string; email: string }>()
})

// Use sharding to route messages
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  
  // Create a client for the entity
  const client = yield* sharding.makeClient(UserEntity)
  
  // Send a message to an entity
  const profile = yield* client.send("user-123", UserEntity.GetProfile)
  
  return profile
})
Key Functions:
  • makeClient(entity) - Create a client for sending messages to entities
  • getShardId(entityId, group) - Get the shard ID for an entity
  • hasShardId(shardId) - Check if this runner owns a shard
  • getSnowflake - Generate unique distributed IDs
  • isShutdown - Check if sharding is shutting down

Entity

Define stateful entities that can be distributed across a cluster.
import { Effect, Schema } from "effect"
import { Entity, Message, Reply } from "effect/unstable/cluster"

// Define entity messages
const Increment = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()

// Define entity state
interface CounterState {
  value: number
}

// Create entity
const CounterEntity = Entity.make("Counter", {
  Increment,
  GetValue
}).pipe(
  Entity.withState<CounterState>({ value: 0 }),
  Entity.withHandlers({
    Increment: (state, { amount }) => 
      Effect.gen(function*() {
        const newValue = state.value + amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    GetValue: (state) => 
      Effect.succeed(Reply.success(state.value))
  })
)
Entity Concepts:
  • EntityAddress - Unique address for routing messages to entities
  • EntityId - Identifier for entity instances
  • EntityType - Type definition for entities
  • EntityProxy - Client-side proxy for entity communication
  • EntityResource - Managed resource for entity lifecycle

Message & Reply

Type-safe message passing between entities.
import { Message, Reply } from "effect/unstable/cluster"
import { Schema } from "effect"

// Define a message with schema validation
const CreateUser = Message.make<
  { name: string; email: string },
  { id: string; createdAt: Date }
>()

// Define replies
const successReply = Reply.success({ id: "123", createdAt: new Date() })
const errorReply = Reply.error(new Error("User already exists"))

// Check reply type
if (Reply.isSuccess(successReply)) {
  console.log(successReply.value)
}

Runners

Manage runner nodes that host entities in the cluster.
import { Effect, Layer } from "effect"
import { 
  Sharding, 
  Runner, 
  RunnerStorage,
  HttpRunner 
} from "effect/unstable/cluster"

// Create an HTTP-based runner
const runnerLayer = Layer.mergeAll(
  HttpRunner.layer,
  RunnerStorage.memory
)

// Start the runner
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const runner = yield* Runner.Runner
  
  // Register entities with the runner
  yield* runner.register(CounterEntity)
  
  // Runner will now host Counter entities
  yield* Effect.never // Keep running
}).pipe(
  Effect.provide(runnerLayer)
)
Runner Types:
  • HttpRunner - HTTP-based runner communication
  • SocketRunner - WebSocket-based runner communication
  • SingleRunner - Single-node runner for development
  • TestRunner - In-memory runner for testing

MessageStorage

Persistent message storage for reliable delivery.
import { MessageStorage, SqlMessageStorage } from "effect/unstable/cluster"

// SQL-based message storage
const messageStorageLayer = SqlMessageStorage.layer

// Use with sharding
const program = Effect.provide(shardingProgram, messageStorageLayer)

RunnerStorage

Store runner registration and health information.
import { RunnerStorage, SqlRunnerStorage } from "effect/unstable/cluster"

// SQL-based runner storage
const runnerStorageLayer = SqlRunnerStorage.layer

Singleton

Manage singleton entities that exist exactly once in the cluster.
import { Effect } from "effect"
import { Singleton, SingletonAddress } from "effect/unstable/cluster"

// Define a singleton service
const CronScheduler = Singleton.make("CronScheduler", {
  schedule: (job: string) => Effect.log(`Scheduling ${job}`),
  cancel: (jobId: string) => Effect.log(`Canceling ${jobId}`)
})

// Access the singleton
const program = Effect.gen(function*() {
  const scheduler = yield* CronScheduler
  yield* scheduler.schedule("daily-backup")
})

ClusterCron

Schedule distributed cron jobs across the cluster.
import { Effect, Schedule } from "effect"
import { ClusterCron } from "effect/unstable/cluster"

// Schedule a recurring task
const cronJob = ClusterCron.make(
  "backup-job",
  Schedule.fixed("1 hour"),
  Effect.log("Running backup...")
)

const program = Effect.gen(function*() {
  yield* ClusterCron.schedule(cronJob)
})

ShardingConfig

Configure sharding behavior.
import { ShardingConfig } from "effect/unstable/cluster"

const config = ShardingConfig.make({
  numberOfShards: 256,
  rebalanceInterval: Duration.minutes(5),
  entityTimeout: Duration.minutes(10),
  entityMaxIdle: Duration.minutes(2)
})

const shardingLayer = Sharding.layer.pipe(
  Layer.provide(Layer.succeed(ShardingConfig.ShardingConfig, config))
)

Snowflake

Generate distributed unique IDs.
import { Effect } from "effect"
import { Snowflake } from "effect/unstable/cluster"

// Generate a Snowflake ID
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const id = yield* sharding.getSnowflake
  
  console.log(id.value) // Unique distributed ID
  console.log(id.timestamp) // Timestamp component
  console.log(id.machineId) // Machine ID component
  console.log(id.sequence) // Sequence component
  
  return id
})

ClusterError

Error types for cluster operations.
import { Match } from "effect"
import type { ClusterError } from "effect/unstable/cluster"

const handleClusterError = Match.type<ClusterError>().pipe(
  Match.when(
    { _tag: "EntityNotAssignedToRunner" },
    (err) => console.error(`Entity ${err.entityId} not on this runner`)
  ),
  Match.when(
    { _tag: "MailboxFull" },
    (err) => console.error(`Mailbox full for ${err.entityId}`)
  ),
  Match.when(
    { _tag: "PersistenceError" },
    (err) => console.error(`Persistence failed: ${err.message}`)
  ),
  Match.orElse((err) => console.error(`Cluster error: ${err}`))
)

ClusterMetrics

Monitor cluster performance and health.
import { Effect } from "effect"
import { ClusterMetrics } from "effect/unstable/cluster"

const program = Effect.gen(function*() {
  const metrics = yield* ClusterMetrics.ClusterMetrics
  
  // Get cluster metrics
  const entityCount = yield* metrics.entityCount
  const messageRate = yield* metrics.messageRate
  const activeShards = yield* metrics.activeShards
  
  return { entityCount, messageRate, activeShards }
})

ClusterWorkflowEngine

Execute distributed workflows across the cluster.
import { Effect } from "effect"
import { ClusterWorkflowEngine } from "effect/unstable/cluster"

// Define a workflow
const orderWorkflow = ClusterWorkflowEngine.defineWorkflow(
  "order-processing",
  Effect.gen(function*() {
    yield* Effect.log("Step 1: Validate order")
    yield* Effect.sleep("1 second")
    
    yield* Effect.log("Step 2: Charge payment")
    yield* Effect.sleep("1 second")
    
    yield* Effect.log("Step 3: Ship order")
    yield* Effect.sleep("1 second")
    
    return { status: "completed" }
  })
)

// Execute workflow
const program = Effect.gen(function*() {
  const engine = yield* ClusterWorkflowEngine.ClusterWorkflowEngine
  const result = yield* engine.execute(orderWorkflow, { orderId: "123" })
  return result
})

Complete Example

Here’s a complete distributed counter application:
import { Effect, Layer, Schedule } from "effect"
import {
  Entity,
  Message,
  Reply,
  Sharding,
  HttpRunner,
  RunnerStorage,
  MessageStorage
} from "effect/unstable/cluster"

// Define messages
const Increment = Message.make<{ amount: number }, number>()
const Decrement = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()

// Define entity
const CounterEntity = Entity.make("Counter", {
  Increment,
  Decrement,
  GetValue
}).pipe(
  Entity.withState<{ value: number }>({ value: 0 }),
  Entity.withHandlers({
    Increment: (state, { amount }) =>
      Effect.gen(function*() {
        const newValue = state.value + amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    Decrement: (state, { amount }) =>
      Effect.gen(function*() {
        const newValue = state.value - amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    GetValue: (state) =>
      Effect.succeed(Reply.success(state.value))
  })
)

// Setup cluster
const clusterLayer = Layer.mergeAll(
  HttpRunner.layer,
  RunnerStorage.memory,
  MessageStorage.memory
)

// Application
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const client = yield* sharding.makeClient(CounterEntity)
  
  // Increment counter
  const value1 = yield* client.send("counter-1", { amount: 5 })
  console.log(`After increment: ${value1}`)
  
  // Decrement counter
  const value2 = yield* client.send("counter-1", { amount: 2 })
  console.log(`After decrement: ${value2}`)
  
  // Get current value
  const value3 = yield* client.send("counter-1", GetValue)
  console.log(`Current value: ${value3}`)
  
  return value3
}).pipe(
  Effect.provide(clusterLayer)
)

// Run
Effect.runPromise(program)

Best Practices

  1. Sharding - Design entity IDs for even distribution across shards
  2. State Management - Keep entity state small and serializable
  3. Message Design - Use schemas for type-safe message validation
  4. Error Handling - Handle cluster errors gracefully with retries
  5. Monitoring - Use ClusterMetrics for observability
  6. Persistence - Choose appropriate storage for messages and state
  7. Testing - Use TestRunner for unit testing entity behavior
  • SQL - Database integration for persistence
  • AI - AI and LLM integration
  • Process - Child process management

Build docs developers (and LLMs) love