Overview
Theeffect/unstable/persistence modules provide abstractions for:
- Key-value storage
- Persisted caches with TTL
- Durable queues
- Rate limiting
- Redis integration
Key-value store
Abstract key-value storage interface:import { Effect, Layer, Schema, ServiceMap } from "effect"
import { KeyValueStore } from "effect/unstable/persistence"
class UserCache extends ServiceMap.Service<UserCache, {
get(id: string): Effect.Effect<User | null, CacheError>
set(id: string, user: User): Effect.Effect<void, CacheError>
}>()("app/UserCache") {
static readonly layer = Layer.effect(
UserCache,
Effect.gen(function*() {
const store = yield* KeyValueStore.KeyValueStore
const get = (id: string) =>
store.get(id).pipe(
Effect.flatMap((value) =>
value ? Schema.decode(User)(JSON.parse(value)) : Effect.succeed(null)
),
Effect.mapError((cause) => new CacheError({ cause }))
)
const set = (id: string, user: User) =>
Schema.encode(User)(user).pipe(
Effect.flatMap((encoded) => store.set(id, JSON.stringify(encoded))),
Effect.mapError((cause) => new CacheError({ cause }))
)
return UserCache.of({ get, set })
})
)
}
Persisted cache
Create caches with automatic TTL:import { PersistedCache } from "effect/unstable/persistence"
import { Effect, Schema } from "effect"
class User extends Schema.Class<User>("User")({
id: Schema.String,
name: Schema.String
}) {}
const userCache = PersistedCache.make({
lookup: (userId: string) => fetchUserFromDatabase(userId),
schema: User,
ttl: "5 minutes"
})
const program = Effect.gen(function*() {
const cache = yield* userCache
// First call fetches from database
const user1 = yield* cache.get("user-123")
// Second call returns cached value
const user2 = yield* cache.get("user-123")
// Invalidate cache entry
yield* cache.invalidate("user-123")
})
Persisted queue
Durable queues that survive restarts:import { PersistedQueue } from "effect/unstable/persistence"
import { Effect, Schema } from "effect"
class Task extends Schema.Class<Task>("Task")({
id: Schema.String,
action: Schema.String
}) {}
const taskQueue = PersistedQueue.make({
name: "tasks",
schema: Task
})
const program = Effect.gen(function*() {
const queue = yield* taskQueue
// Enqueue tasks
yield* queue.offer({ id: "1", action: "process" })
yield* queue.offer({ id: "2", action: "cleanup" })
// Process tasks
const task = yield* queue.take
yield* processTask(task)
})
Rate limiter
Implement rate limiting:import { RateLimiter } from "effect/unstable/persistence"
import { Effect } from "effect"
const limiter = RateLimiter.make({
limit: 100,
window: "1 minute"
})
const rateLimitedRequest = Effect.gen(function*() {
const allowed = yield* limiter.check(userId)
if (!allowed) {
return yield* Effect.fail(new RateLimitExceeded())
}
yield* processRequest()
})
Redis integration
Use Redis as the persistence backend:import { Redis } from "effect/unstable/persistence"
import { Layer } from "effect"
const RedisLayer = Redis.layer({
url: "redis://localhost:6379"
})
// Use with key-value store
const UserCacheLayer = UserCache.layer.pipe(
Layer.provide(RedisLayer)
)
// Use with persisted cache
const cachedUsers = PersistedCache.make({
lookup: fetchUser,
schema: User,
ttl: "5 minutes"
}).pipe(
Layer.provide(RedisLayer)
)
Persistable values
Define custom persistence logic:import { Persistable } from "effect/unstable/persistence"
import { Schema } from "effect"
class CustomType extends Schema.Class<CustomType>("CustomType")({
data: Schema.String
}) {}
const persistable = Persistable.make({
encode: (value: CustomType) => JSON.stringify(value),
decode: (bytes: string) => Schema.decode(CustomType)(JSON.parse(bytes))
})
Persistence interface
Implement custom persistence backends:import { Persistence, KeyValueStore } from "effect/unstable/persistence"
import { Effect, Layer } from "effect"
const MemoryStore = Layer.succeed(
KeyValueStore,
KeyValueStore.of({
get: (key) => Effect.sync(() => storage.get(key) ?? null),
set: (key, value) => Effect.sync(() => storage.set(key, value)),
delete: (key) => Effect.sync(() => storage.delete(key)),
has: (key) => Effect.sync(() => storage.has(key))
})
)
Complete example
import { Effect, Layer, Schema, ServiceMap } from "effect"
import { KeyValueStore, PersistedCache, Redis } from "effect/unstable/persistence"
class User extends Schema.Class<User>("User")({
id: Schema.String,
name: Schema.String,
email: Schema.String
}) {}
class CacheError extends Schema.TaggedErrorClass<CacheError>()("CacheError", {
cause: Schema.Defect
}) {}
// Define database service
class Database extends ServiceMap.Service<Database, {
findUser(id: string): Effect.Effect<User | null, DatabaseError>
}>()("app/Database") {
static readonly layer = Layer.succeed(
Database,
Database.of({
findUser: (id) => Effect.succeed({
id,
name: "Alice",
email: "[email protected]"
})
})
)
}
class DatabaseError extends Schema.TaggedErrorClass<DatabaseError>()("DatabaseError", {
cause: Schema.Defect
}) {}
// Create cached user service
class UserService extends ServiceMap.Service<UserService, {
getUser(id: string): Effect.Effect<User | null, CacheError | DatabaseError>
}>()("app/UserService") {
static readonly layer = Layer.effect(
UserService,
Effect.gen(function*() {
const db = yield* Database
const cache = yield* PersistedCache.make({
lookup: (id: string) => db.findUser(id),
schema: User,
ttl: "5 minutes"
})
const getUser = (id: string) =>
cache.get(id).pipe(
Effect.mapError((cause) => new CacheError({ cause }))
)
return UserService.of({ getUser })
})
)
}
// Configure Redis persistence
const RedisLayer = Redis.layer({
url: "redis://localhost:6379"
})
// Compose layers
const AppLayer = UserService.layer.pipe(
Layer.provide(Database.layer),
Layer.provide(RedisLayer)
)
// Use the service
const program = Effect.gen(function*() {
const userService = yield* UserService
// First call hits database
const user1 = yield* userService.getUser("user-123")
yield* Effect.log(`User: ${user1?.name}`)
// Second call hits cache
const user2 = yield* userService.getUser("user-123")
yield* Effect.log(`Cached user: ${user2?.name}`)
}).pipe(
Effect.provide(AppLayer)
)