The Cluster module is marked as unstable, meaning its APIs may change in minor version releases. Use caution when upgrading Effect versions.
Overview
The effect/unstable/cluster module provides primitives for building distributed applications with Effect. It implements sharding, entity management, message passing, and distributed coordination patterns for scalable, fault-tolerant systems.
Installation
Key Modules
Sharding
The core service for distributed entity management and message routing.
import { Effect } from "effect"
import { Sharding, Entity, Message } from "effect/unstable/cluster"
// Define an entity
const UserEntity = Entity.make("User", {
// Entity state and behavior
GetProfile: Message.make<{ userId: string }, { name: string; email: string }>()
})
// Use sharding to route messages
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
// Create a client for the entity
const client = yield* sharding.makeClient(UserEntity)
// Send a message to an entity
const profile = yield* client.send("user-123", UserEntity.GetProfile)
return profile
})
Key Functions:
makeClient(entity) - Create a client for sending messages to entities
getShardId(entityId, group) - Get the shard ID for an entity
hasShardId(shardId) - Check if this runner owns a shard
getSnowflake - Generate unique distributed IDs
isShutdown - Check if sharding is shutting down
Entity
Define stateful entities that can be distributed across a cluster.
import { Effect, Schema } from "effect"
import { Entity, Message, Reply } from "effect/unstable/cluster"
// Define entity messages
const Increment = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()
// Define entity state
interface CounterState {
value: number
}
// Create entity
const CounterEntity = Entity.make("Counter", {
Increment,
GetValue
}).pipe(
Entity.withState<CounterState>({ value: 0 }),
Entity.withHandlers({
Increment: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value + amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
GetValue: (state) =>
Effect.succeed(Reply.success(state.value))
})
)
Entity Concepts:
- EntityAddress - Unique address for routing messages to entities
- EntityId - Identifier for entity instances
- EntityType - Type definition for entities
- EntityProxy - Client-side proxy for entity communication
- EntityResource - Managed resource for entity lifecycle
Message & Reply
Type-safe message passing between entities.
import { Message, Reply } from "effect/unstable/cluster"
import { Schema } from "effect"
// Define a message with schema validation
const CreateUser = Message.make<
{ name: string; email: string },
{ id: string; createdAt: Date }
>()
// Define replies
const successReply = Reply.success({ id: "123", createdAt: new Date() })
const errorReply = Reply.error(new Error("User already exists"))
// Check reply type
if (Reply.isSuccess(successReply)) {
console.log(successReply.value)
}
Runners
Manage runner nodes that host entities in the cluster.
import { Effect, Layer } from "effect"
import {
Sharding,
Runner,
RunnerStorage,
HttpRunner
} from "effect/unstable/cluster"
// Create an HTTP-based runner
const runnerLayer = Layer.mergeAll(
HttpRunner.layer,
RunnerStorage.memory
)
// Start the runner
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const runner = yield* Runner.Runner
// Register entities with the runner
yield* runner.register(CounterEntity)
// Runner will now host Counter entities
yield* Effect.never // Keep running
}).pipe(
Effect.provide(runnerLayer)
)
Runner Types:
- HttpRunner - HTTP-based runner communication
- SocketRunner - WebSocket-based runner communication
- SingleRunner - Single-node runner for development
- TestRunner - In-memory runner for testing
MessageStorage
Persistent message storage for reliable delivery.
import { MessageStorage, SqlMessageStorage } from "effect/unstable/cluster"
// SQL-based message storage
const messageStorageLayer = SqlMessageStorage.layer
// Use with sharding
const program = Effect.provide(shardingProgram, messageStorageLayer)
RunnerStorage
Store runner registration and health information.
import { RunnerStorage, SqlRunnerStorage } from "effect/unstable/cluster"
// SQL-based runner storage
const runnerStorageLayer = SqlRunnerStorage.layer
Singleton
Manage singleton entities that exist exactly once in the cluster.
import { Effect } from "effect"
import { Singleton, SingletonAddress } from "effect/unstable/cluster"
// Define a singleton service
const CronScheduler = Singleton.make("CronScheduler", {
schedule: (job: string) => Effect.log(`Scheduling ${job}`),
cancel: (jobId: string) => Effect.log(`Canceling ${jobId}`)
})
// Access the singleton
const program = Effect.gen(function*() {
const scheduler = yield* CronScheduler
yield* scheduler.schedule("daily-backup")
})
ClusterCron
Schedule distributed cron jobs across the cluster.
import { Effect, Schedule } from "effect"
import { ClusterCron } from "effect/unstable/cluster"
// Schedule a recurring task
const cronJob = ClusterCron.make(
"backup-job",
Schedule.fixed("1 hour"),
Effect.log("Running backup...")
)
const program = Effect.gen(function*() {
yield* ClusterCron.schedule(cronJob)
})
ShardingConfig
Configure sharding behavior.
import { ShardingConfig } from "effect/unstable/cluster"
const config = ShardingConfig.make({
numberOfShards: 256,
rebalanceInterval: Duration.minutes(5),
entityTimeout: Duration.minutes(10),
entityMaxIdle: Duration.minutes(2)
})
const shardingLayer = Sharding.layer.pipe(
Layer.provide(Layer.succeed(ShardingConfig.ShardingConfig, config))
)
Snowflake
Generate distributed unique IDs.
import { Effect } from "effect"
import { Snowflake } from "effect/unstable/cluster"
// Generate a Snowflake ID
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const id = yield* sharding.getSnowflake
console.log(id.value) // Unique distributed ID
console.log(id.timestamp) // Timestamp component
console.log(id.machineId) // Machine ID component
console.log(id.sequence) // Sequence component
return id
})
ClusterError
Error types for cluster operations.
import { Match } from "effect"
import type { ClusterError } from "effect/unstable/cluster"
const handleClusterError = Match.type<ClusterError>().pipe(
Match.when(
{ _tag: "EntityNotAssignedToRunner" },
(err) => console.error(`Entity ${err.entityId} not on this runner`)
),
Match.when(
{ _tag: "MailboxFull" },
(err) => console.error(`Mailbox full for ${err.entityId}`)
),
Match.when(
{ _tag: "PersistenceError" },
(err) => console.error(`Persistence failed: ${err.message}`)
),
Match.orElse((err) => console.error(`Cluster error: ${err}`))
)
ClusterMetrics
Monitor cluster performance and health.
import { Effect } from "effect"
import { ClusterMetrics } from "effect/unstable/cluster"
const program = Effect.gen(function*() {
const metrics = yield* ClusterMetrics.ClusterMetrics
// Get cluster metrics
const entityCount = yield* metrics.entityCount
const messageRate = yield* metrics.messageRate
const activeShards = yield* metrics.activeShards
return { entityCount, messageRate, activeShards }
})
ClusterWorkflowEngine
Execute distributed workflows across the cluster.
import { Effect } from "effect"
import { ClusterWorkflowEngine } from "effect/unstable/cluster"
// Define a workflow
const orderWorkflow = ClusterWorkflowEngine.defineWorkflow(
"order-processing",
Effect.gen(function*() {
yield* Effect.log("Step 1: Validate order")
yield* Effect.sleep("1 second")
yield* Effect.log("Step 2: Charge payment")
yield* Effect.sleep("1 second")
yield* Effect.log("Step 3: Ship order")
yield* Effect.sleep("1 second")
return { status: "completed" }
})
)
// Execute workflow
const program = Effect.gen(function*() {
const engine = yield* ClusterWorkflowEngine.ClusterWorkflowEngine
const result = yield* engine.execute(orderWorkflow, { orderId: "123" })
return result
})
Complete Example
Here’s a complete distributed counter application:
import { Effect, Layer, Schedule } from "effect"
import {
Entity,
Message,
Reply,
Sharding,
HttpRunner,
RunnerStorage,
MessageStorage
} from "effect/unstable/cluster"
// Define messages
const Increment = Message.make<{ amount: number }, number>()
const Decrement = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()
// Define entity
const CounterEntity = Entity.make("Counter", {
Increment,
Decrement,
GetValue
}).pipe(
Entity.withState<{ value: number }>({ value: 0 }),
Entity.withHandlers({
Increment: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value + amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
Decrement: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value - amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
GetValue: (state) =>
Effect.succeed(Reply.success(state.value))
})
)
// Setup cluster
const clusterLayer = Layer.mergeAll(
HttpRunner.layer,
RunnerStorage.memory,
MessageStorage.memory
)
// Application
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const client = yield* sharding.makeClient(CounterEntity)
// Increment counter
const value1 = yield* client.send("counter-1", { amount: 5 })
console.log(`After increment: ${value1}`)
// Decrement counter
const value2 = yield* client.send("counter-1", { amount: 2 })
console.log(`After decrement: ${value2}`)
// Get current value
const value3 = yield* client.send("counter-1", GetValue)
console.log(`Current value: ${value3}`)
return value3
}).pipe(
Effect.provide(clusterLayer)
)
// Run
Effect.runPromise(program)
Best Practices
- Sharding - Design entity IDs for even distribution across shards
- State Management - Keep entity state small and serializable
- Message Design - Use schemas for type-safe message validation
- Error Handling - Handle cluster errors gracefully with retries
- Monitoring - Use ClusterMetrics for observability
- Persistence - Choose appropriate storage for messages and state
- Testing - Use TestRunner for unit testing entity behavior
- SQL - Database integration for persistence
- AI - AI and LLM integration
- Process - Child process management