Skip to main content
@effect/cluster provides unified interfaces for building distributed systems with Effect, enabling sharding, distributed state management, and cluster coordination.

Installation

npm install @effect/cluster @effect/platform @effect/rpc @effect/sql

Core Concepts

@effect/cluster enables you to build resilient distributed systems by providing:
  • Sharding: Distribute entities across cluster nodes
  • Message Passing: Type-safe communication between entities
  • Persistence: Durable state management
  • Cluster Coordination: Node discovery and health monitoring
  • Load Balancing: Automatic entity distribution

Quick Start

Define an Entity

Create a sharded entity using Effect:
import { Effect, Schema } from "effect"
import { ShardingEntity } from "@effect/cluster"

class Counter extends ShardingEntity.make("Counter", {
  Increment: Schema.Struct({
    amount: Schema.Number
  }).pipe(Schema.withResult(Schema.Number)),
  
  GetValue: Schema.Struct({}).pipe(
    Schema.withResult(Schema.Number)
  )
}) {
  initialState = 0

  behavior = (entityId: string, message: Counter.Message) =>
    Effect.gen(this, function* () {
      const state = yield* this.state

      switch (message._tag) {
        case "Increment":
          const newValue = state + message.amount
          yield* this.state.set(newValue)
          return newValue

        case "GetValue":
          return state
      }
    })
}

Start a Cluster Node

import { Effect, Layer } from "effect"
import { Sharding, ShardingConfig } from "@effect/cluster"
import { PgClient } from "@effect/sql-pg"

const ShardingLive = Sharding.live.pipe(
  Layer.provide(ShardingConfig.defaults),
  Layer.provide(PgClient.layer({ database: "cluster_db" }))
)

const program = Effect.gen(function* () {
  const sharding = yield* Sharding.Sharding
  
  // Register entity type
  yield* sharding.registerEntity(Counter)
  
  // Send messages to entities
  const messenger = yield* sharding.messenger(Counter)
  
  const result = yield* messenger.send(
    "counter-1",
    new Counter.Increment({ amount: 5 })
  )
  
  yield* Effect.log(`Counter value: ${result}`)
})

Effect.runPromise(
  program.pipe(Effect.provide(ShardingLive))
)

Persistence

Entities can persist state to a database:
import { Effect, Schema } from "effect"
import { ShardingEntity } from "@effect/cluster"

class PersistentCounter extends ShardingEntity.make("PersistentCounter", {
  // Message definitions...
}) {
  // Enable persistence
  persistence = true
  
  // State schema for serialization
  stateSchema = Schema.Number
  
  initialState = 0
  
  behavior = (entityId: string, message: PersistentCounter.Message) =>
    Effect.gen(this, function* () {
      // State automatically persisted after each message
      const state = yield* this.state
      // Handle messages...
    })
}

Message Broadcasting

Send messages to multiple entities:
import { Effect } from "effect"
import { Sharding } from "@effect/cluster"

const program = Effect.gen(function* () {
  const sharding = yield* Sharding.Sharding
  const messenger = yield* sharding.messenger(Counter)
  
  // Broadcast to multiple entities
  const results = yield* Effect.all(
    [
      messenger.send("counter-1", new Counter.Increment({ amount: 1 })),
      messenger.send("counter-2", new Counter.Increment({ amount: 2 })),
      messenger.send("counter-3", new Counter.Increment({ amount: 3 }))
    ],
    { concurrency: "unbounded" }
  )
  
  return results
})

Cluster Management

Node Discovery

import { Effect } from "effect"
import { ShardingConfig, Sharding } from "@effect/cluster"

const config = ShardingConfig.make({
  // Kubernetes service discovery
  discovery: "kubernetes",
  namespace: "my-namespace",
  podName: "my-service",
  
  // Or static node list
  // discovery: "static",
  // nodes: ["node1:8080", "node2:8080"]
})

Health Checks

Monitor cluster health:
import { Effect, Schedule } from "effect"
import { Sharding } from "@effect/cluster"

const healthCheck = Effect.gen(function* () {
  const sharding = yield* Sharding.Sharding
  const health = yield* sharding.getHealth
  
  yield* Effect.log(`Cluster nodes: ${health.nodes.length}`)
  yield* Effect.log(`Active shards: ${health.shards}`)
}).pipe(
  Effect.repeat(Schedule.spaced("10 seconds"))
)

Deployment

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cluster-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: cluster-service
  template:
    metadata:
      labels:
        app: cluster-service
    spec:
      containers:
      - name: app
        image: my-cluster-app:latest
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace

Docker Compose

version: '3.8'
services:
  node1:
    image: my-cluster-app:latest
    environment:
      - NODE_ID=node1
      - CLUSTER_NODES=node1:8080,node2:8080
      
  node2:
    image: my-cluster-app:latest
    environment:
      - NODE_ID=node2
      - CLUSTER_NODES=node1:8080,node2:8080
      
  postgres:
    image: postgres:16
    environment:
      - POSTGRES_DB=cluster_db

Storage Backends

Choose a storage backend for cluster state:

PostgreSQL

npm install @effect/sql-pg
Reliable, ACID-compliant storage

MySQL

npm install @effect/sql-mysql2
Popular relational database

SQLite

npm install @effect/sql-sqlite-node
Lightweight local storage

Integration with Workflows

Combine with @effect/workflow for durable execution:
import { Workflow } from "@effect/workflow"
import { Sharding } from "@effect/cluster"
import { Effect } from "effect"

const orderWorkflow = Workflow.make("OrderWorkflow", {
  execute: (orderId: string) =>
    Effect.gen(function* () {
      const sharding = yield* Sharding.Sharding
      const messenger = yield* sharding.messenger(Counter)
      
      // Workflow steps with entity interaction
      yield* messenger.send(orderId, new Counter.Increment({ amount: 1 }))
      yield* Effect.sleep("1 second")
      // More steps...
    })
})

Best Practices

  1. Entity Design: Keep entities small and focused
  2. Message Immutability: Use immutable message types
  3. Idempotency: Design message handlers to be idempotent
  4. Error Handling: Handle failures gracefully with retries
  5. Monitoring: Use OpenTelemetry for distributed tracing
  6. Testing: Test entities in isolation before clustering

API Reference

Complete API documentation

@effect/rpc

RPC for inter-service communication

@effect/sql

SQL toolkit for persistence

@effect/workflow

Durable workflow execution

Build docs developers (and LLMs) love