Skip to main content

Overview

The effect/unstable/cluster modules enable building distributed applications with:
  • Entities that manage stateful behaviors
  • Location-transparent RPC communication
  • Automatic entity lifecycle management
  • Support for distributed storage

Defining entities

Entities are distributed objects that handle RPC messages:
import { Schema } from "effect"
import { Entity } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"

// Define RPC operations
export const Increment = Rpc.make("Increment", {
  payload: { amount: Schema.Number },
  success: Schema.Number
})

export const GetCount = Rpc.make("GetCount", {
  success: Schema.Number
})

// Create entity from RPC definitions
export const Counter = Entity.make("Counter", [Increment, GetCount])

Implementing entity handlers

Provide implementations for each RPC operation:
import { Effect, Layer, Ref } from "effect"

export const CounterEntityLayer = Counter.toLayer(
  Effect.gen(function*() {
    const count = yield* Ref.make(0)

    return Counter.of({
      Increment: ({ payload }) =>
        Ref.updateAndGet(count, (current) => current + payload.amount),
      
      GetCount: () => Ref.get(count).pipe(Rpc.fork)
    })
  }),
  { maxIdleTime: "5 minutes" }
)

Concurrent handlers

By default, handlers run sequentially per entity. Use Rpc.fork to opt into concurrency:
GetCount: () =>
  Ref.get(count).pipe(
    Rpc.fork  // Allow concurrent reads
  )

Persisting RPC messages

Mark RPCs as persistent to survive restarts:
import { ClusterSchema } from "effect/unstable/cluster"

export const GetCount = Rpc.make("GetCount", {
  success: Schema.Number
})
  .annotate(ClusterSchema.Persisted, true)

Using entity clients

Call entity operations from anywhere:
import { Effect } from "effect"

const program = Effect.gen(function*() {
  const clientFor = yield* Counter.client
  const counter = clientFor("counter-123")

  const afterIncrement = yield* counter.Increment({ amount: 1 })
  const currentCount = yield* counter.GetCount()

  yield* Effect.log(`After increment: ${afterIncrement}`)
  yield* Effect.log(`Current count: ${currentCount}`)
})

Setting up a cluster

Production cluster

import { Layer } from "effect"
import { NodeClusterSocket } from "@effect/platform-node"
import type { SqlClient } from "effect/unstable/sql"

declare const SqlClientLayer: Layer.Layer<SqlClient.SqlClient>

const ClusterLayer = NodeClusterSocket.layer().pipe(
  Layer.provide(SqlClientLayer)
)

const EntitiesLayer = Layer.mergeAll(
  CounterEntityLayer,
  // ... other entity layers
)

const ProductionLayer = EntitiesLayer.pipe(
  Layer.provide(ClusterLayer)
)

Test cluster

Use TestRunner for local development and testing:
import { TestRunner } from "effect/unstable/cluster"

const TestClusterLayer = TestRunner.layer

export const TestLayer = EntitiesLayer.pipe(
  Layer.provideMerge(TestClusterLayer)
)

Running the cluster

import { NodeRuntime } from "@effect/platform-node"
import { Layer } from "effect"

Layer.launch(ProductionLayer).pipe(
  NodeRuntime.runMain
)

Complete example

import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"
import { Effect, Layer, Ref, Schema } from "effect"
import { ClusterSchema, Entity, TestRunner } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"
import type { SqlClient } from "effect/unstable/sql"

export const Increment = Rpc.make("Increment", {
  payload: { amount: Schema.Number },
  success: Schema.Number
})

export const GetCount = Rpc.make("GetCount", {
  success: Schema.Number
})
  .annotate(ClusterSchema.Persisted, true)

export const Counter = Entity.make("Counter", [Increment, GetCount])

export const CounterEntityLayer = Counter.toLayer(
  Effect.gen(function*() {
    const count = yield* Ref.make(0)

    return Counter.of({
      Increment: ({ payload }) =>
        Ref.updateAndGet(count, (current) => current + payload.amount),
      
      GetCount: () =>
        Ref.get(count).pipe(Rpc.fork)
    })
  }),
  { maxIdleTime: "5 minutes" }
)

export const useCounter = Effect.gen(function*() {
  const clientFor = yield* Counter.client
  const counter = clientFor("counter-123")

  const afterIncrement = yield* counter.Increment({ amount: 1 })
  const currentCount = yield* counter.GetCount()

  console.log(`Count after increment: ${afterIncrement}, current count: ${currentCount}`)
})

declare const SqlClientLayer: Layer.Layer<SqlClient.SqlClient>

const ClusterLayer = NodeClusterSocket.layer().pipe(
  Layer.provide(SqlClientLayer)
)

const ClusterLayerTest = TestRunner.layer

const EntitiesLayer = Layer.mergeAll(
  CounterEntityLayer
)

const ProductionLayer = EntitiesLayer.pipe(
  Layer.provide(ClusterLayer)
)

export const TestLayer = EntitiesLayer.pipe(
  Layer.provideMerge(ClusterLayerTest)
)

Layer.launch(ProductionLayer).pipe(
  NodeRuntime.runMain
)

Entity lifecycle

  • Entities are created on-demand when messages arrive
  • Idle entities are passivated after maxIdleTime
  • Entities resume on the next message
  • State is reconstructed from persistent messages

See also

  • RPC - Define RPC operations
  • Workflow - Orchestrate long-running workflows

Build docs developers (and LLMs) love