Overview
The persistence layer (packages/persistence) provides type-safe database access using @effect/sql with PostgreSQL. All database operations return Effects with typed errors.
The persistence layer depends on
packages/core for domain types but has no knowledge of HTTP, API, or frontend concerns.Architecture
Repository (interface)
↓
RepositoryLive (implementation)
↓
@effect/sql (SQL client)
↓
PostgreSQL
Database Stack
PostgreSQL
Version: 14+ (recommended 16+) Why PostgreSQL:- ACID compliance for financial data
- JSON/JSONB support for flexible schemas
- Advanced features (CTEs, window functions, array types)
- Excellent performance for OLTP workloads
- Row-level security for multi-tenancy
@effect/sql
Purpose: Type-safe SQL query builder Key Features:- Template literal queries (write SQL naturally)
- Schema validation for results
- Transaction support with automatic rollback
- Connection pooling
- Prepared statement caching
import { SqlClient } from "@effect/sql"
import { SqlSchema } from "@effect/sql"
const findById = (id: AccountId) =>
Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
const rows = yield* sql`
SELECT id, account_number, name, account_type
FROM accounts
WHERE id = ${id}
`
return rows[0]
})
Repository Pattern
Interface Definition
// packages/persistence/src/Services/AccountRepository.ts
import * as Context from "effect/Context"
import type * as Effect from "effect/Effect"
import type * as Option from "effect/Option"
import type { Account, AccountId } from "@accountability/core/accounting/Account"
import type { CompanyId } from "@accountability/core/company/Company"
import type { OrganizationId } from "@accountability/core/organization/Organization"
import type { PersistenceError, EntityNotFoundError } from "../Errors/RepositoryError"
export interface AccountRepository {
readonly findById: (
organizationId: OrganizationId,
id: AccountId
) => Effect.Effect<Option.Option<Account>, PersistenceError>
readonly findByCompany: (
organizationId: OrganizationId,
companyId: CompanyId
) => Effect.Effect<ReadonlyArray<Account>, PersistenceError>
readonly create: (
account: Account
) => Effect.Effect<Account, PersistenceError>
readonly update: (
organizationId: OrganizationId,
account: Account
) => Effect.Effect<Account, EntityNotFoundError | PersistenceError>
readonly getById: (
organizationId: OrganizationId,
id: AccountId
) => Effect.Effect<Account, EntityNotFoundError | PersistenceError>
}
export class AccountRepository extends Context.Tag("AccountRepository")<
AccountRepository,
AccountRepository
>() {}
Implementation
// packages/persistence/src/Layers/AccountRepositoryLive.ts
import { Layer, Effect } from "effect"
import { SqlClient } from "@effect/sql"
import { SqlSchema } from "@effect/sql"
import * as Schema from "effect/Schema"
import { AccountRepository } from "../Services/AccountRepository"
import { Account } from "@accountability/core/accounting/Account"
import { PersistenceError, EntityNotFoundError } from "../Errors/RepositoryError"
const AccountRow = Schema.Struct({
id: AccountId,
company_id: CompanyId,
account_number: AccountNumber,
name: Schema.String,
account_type: AccountType,
category: AccountCategory,
normal_balance: NormalBalance,
parent_account_id: Schema.NullOr(AccountId),
level: Schema.Number,
is_active: Schema.Boolean,
allow_manual_entries: Schema.Boolean,
created_at: Schema.DateFromString,
updated_at: Schema.DateFromString
})
const rowToAccount = (row: typeof AccountRow.Type): Account =>
Account.make({
id: row.id,
companyId: row.company_id,
accountNumber: row.account_number,
name: row.name,
accountType: row.account_type,
category: row.category,
normalBalance: row.normal_balance,
parentAccountId: Option.fromNullable(row.parent_account_id),
level: row.level,
isActive: row.is_active,
allowManualEntries: row.allow_manual_entries,
createdAt: Timestamp.fromDate(row.created_at),
updatedAt: Timestamp.fromDate(row.updated_at)
})
const make = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
const findById = (
organizationId: OrganizationId,
id: AccountId
) =>
SqlSchema.findOne({
Request: Schema.Struct({ organizationId, id }),
Result: AccountRow,
execute: ({ organizationId, id }) => sql`
SELECT a.*
FROM accounts a
JOIN companies c ON c.id = a.company_id
WHERE a.id = ${id}
AND c.organization_id = ${organizationId}
`
})({ organizationId, id }).pipe(
Effect.map(Option.map(rowToAccount))
)
const findByCompany = (
organizationId: OrganizationId,
companyId: CompanyId
) =>
SqlSchema.findAll({
Request: Schema.Struct({ organizationId, companyId }),
Result: AccountRow,
execute: ({ organizationId, companyId }) => sql`
SELECT a.*
FROM accounts a
JOIN companies c ON c.id = a.company_id
WHERE a.company_id = ${companyId}
AND c.organization_id = ${organizationId}
ORDER BY a.account_number
`
})({ organizationId, companyId }).pipe(
Effect.map((rows) => rows.map(rowToAccount))
)
const create = (account: Account) =>
sql`
INSERT INTO accounts ${
sql.insert({
id: account.id,
company_id: account.companyId,
account_number: account.accountNumber,
name: account.name,
account_type: account.accountType,
category: account.category,
normal_balance: account.normalBalance,
parent_account_id: Option.getOrNull(account.parentAccountId),
level: account.level,
is_active: account.isActive,
allow_manual_entries: account.allowManualEntries
})
}
RETURNING *
`.pipe(
SqlSchema.single({ Result: AccountRow }),
Effect.map(rowToAccount)
)
const update = (
organizationId: OrganizationId,
account: Account
) =>
sql`
UPDATE accounts a
SET ${
sql.update({
name: account.name,
account_type: account.accountType,
category: account.category,
is_active: account.isActive,
updated_at: new Date()
})
}
FROM companies c
WHERE a.id = ${account.id}
AND a.company_id = c.id
AND c.organization_id = ${organizationId}
RETURNING a.*
`.pipe(
SqlSchema.single({ Result: AccountRow }),
Effect.map(rowToAccount),
Effect.mapError(() =>
new EntityNotFoundError({
entity: "Account",
id: account.id
})
)
)
const getById = (
organizationId: OrganizationId,
id: AccountId
) =>
findById(organizationId, id).pipe(
Effect.flatMap(
Option.match({
onNone: () => Effect.fail(
new EntityNotFoundError({ entity: "Account", id })
),
onSome: Effect.succeed
})
)
)
return {
findById,
findByCompany,
create,
update,
getById
}
})
export const AccountRepositoryLive = Layer.effect(AccountRepository, make)
SqlSchema Patterns
findOne - Returns Option<T>
Use when: You expect 0 or 1 result.const findById = SqlSchema.findOne({
Request: AccountId,
Result: AccountRow,
execute: (id) => sql`
SELECT * FROM accounts WHERE id = ${id}
`
})
// Returns: Effect<Option<Account>, SqlError>
const account = yield* findById(accountId)
if (Option.isSome(account)) {
console.log(account.value.name)
}
findAll - Returns Array<T>
Use when: You expect 0 or more results.const findByCompany = SqlSchema.findAll({
Request: CompanyId,
Result: AccountRow,
execute: (companyId) => sql`
SELECT * FROM accounts WHERE company_id = ${companyId}
`
})
// Returns: Effect<ReadonlyArray<Account>, SqlError>
const accounts = yield* findByCompany(companyId)
console.log(`Found ${accounts.length} accounts`)
single - Expects Exactly 1 Result
Use when: You expect exactly 1 result (fails if 0 or >1).const getById = SqlSchema.single({
Request: AccountId,
Result: AccountRow,
execute: (id) => sql`
SELECT * FROM accounts WHERE id = ${id}
`
})
// Returns: Effect<Account, SqlError | NotFoundError>
const account = yield* getById(accountId)
// Throws if account not found
void - No Result Expected
Use when: INSERT/UPDATE/DELETE with no RETURNING clause.const deleteById = SqlSchema.void({
Request: AccountId,
execute: (id) => sql`
DELETE FROM accounts WHERE id = ${id}
`
})
// Returns: Effect<void, SqlError>
yield* deleteById(accountId)
SQL Helpers
sql.insert - Insert Rows
// Single row
sql`INSERT INTO accounts ${sql.insert({
id: "acc_123",
name: "Cash",
account_type: "Asset"
})}`
// Multiple rows
sql`INSERT INTO accounts ${sql.insert([
{ id: "acc_1", name: "Cash", account_type: "Asset" },
{ id: "acc_2", name: "Bank", account_type: "Asset" }
])}`
// With RETURNING
sql`
INSERT INTO accounts ${sql.insert(account)}
RETURNING *
`
sql.update - Update Statement
sql`
UPDATE accounts
SET ${sql.update({
name: "New Name",
updated_at: new Date()
})}
WHERE id = ${id}
`
// Generates: SET name = $1, updated_at = $2
sql.in - IN Clause
const accountIds = ["acc_1", "acc_2", "acc_3"]
sql`
SELECT * FROM accounts
WHERE id IN ${sql.in(accountIds)}
`
// Generates: WHERE id IN ($1, $2, $3)
sql.and - AND Conditions
const conditions = [
sql`company_id = ${companyId}`,
sql`is_active = true`,
sql`account_type = 'Asset'`
]
sql`
SELECT * FROM accounts
WHERE ${sql.and(conditions)}
`
// Generates: WHERE company_id = $1 AND is_active = $2 AND account_type = $3
Transactions
sql.withTransaction
Purpose: Run multiple operations atomically.const createJournalEntry = (
entry: JournalEntry,
lines: ReadonlyArray<JournalEntryLine>
) =>
Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
return yield* sql.withTransaction(
Effect.gen(function* () {
// Insert journal entry
yield* sql`
INSERT INTO journal_entries ${sql.insert(entry)}
`
// Insert all lines
yield* sql`
INSERT INTO journal_entry_lines ${sql.insert(lines)}
`
// Update account balances
yield* Effect.forEach(lines, (line) =>
sql`
UPDATE account_balances
SET balance = balance + ${line.debit} - ${line.credit}
WHERE account_id = ${line.accountId}
`
)
return entry
})
)
})
// If any operation fails, entire transaction rolls back
Nested Transactions
const outerTransaction = sql.withTransaction(
Effect.gen(function* () {
yield* operation1()
// Nested transaction (uses savepoint)
yield* sql.withTransaction(
Effect.gen(function* () {
yield* operation2()
yield* operation3()
})
)
yield* operation4()
})
)
Migrations
Migration Structure
// packages/persistence/src/Migrations/Migration0003_CreateAccounts.ts
import { Effect } from "effect"
import { SqlClient } from "@effect/sql"
export const Migration0003_CreateAccounts = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
yield* sql`
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
company_id TEXT NOT NULL REFERENCES companies(id),
account_number TEXT NOT NULL,
name TEXT NOT NULL,
account_type TEXT NOT NULL CHECK (account_type IN (
'Asset', 'Liability', 'Equity', 'Revenue', 'Expense'
)),
category TEXT NOT NULL,
normal_balance TEXT NOT NULL CHECK (normal_balance IN ('Debit', 'Credit')),
parent_account_id TEXT REFERENCES accounts(id),
level INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT true,
allow_manual_entries BOOLEAN NOT NULL DEFAULT true,
require_department BOOLEAN NOT NULL DEFAULT false,
require_project BOOLEAN NOT NULL DEFAULT false,
currency_code TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (company_id, account_number)
)
`
yield* sql`
CREATE INDEX idx_accounts_company_id ON accounts(company_id)
`
yield* sql`
CREATE INDEX idx_accounts_parent_id ON accounts(parent_account_id)
`
yield* sql`
CREATE INDEX idx_accounts_type ON accounts(account_type)
`
})
Running Migrations
// packages/persistence/src/Layers/MigrationsLive.ts
import { Layer, Effect } from "effect"
import { SqlClient } from "@effect/sql"
const runMigrations = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
// Create migrations table
yield* sql`
CREATE TABLE IF NOT EXISTS migrations (
id TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`
// Get applied migrations
const applied = yield* sql`SELECT id FROM migrations`
const appliedIds = new Set(applied.map(r => r.id))
// Run pending migrations in order
const migrations = [
{ id: "0001", effect: Migration0001_CreateOrganizations },
{ id: "0002", effect: Migration0002_CreateCompanies },
{ id: "0003", effect: Migration0003_CreateAccounts },
// ...
]
for (const migration of migrations) {
if (!appliedIds.has(migration.id)) {
yield* Effect.log(`Running migration ${migration.id}`)
yield* migration.effect
yield* sql`INSERT INTO migrations (id) VALUES (${migration.id})`
}
}
})
export const MigrationsLive = Layer.effect(
Migrations,
Effect.gen(function* () {
yield* runMigrations
return {}
})
)
Database Schema
Key Tables
organizations
CREATE TABLE organizations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
reporting_currency TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
companies
CREATE TABLE companies (
id TEXT PRIMARY KEY,
organization_id TEXT NOT NULL REFERENCES organizations(id),
name TEXT NOT NULL,
legal_name TEXT NOT NULL,
company_type TEXT NOT NULL,
functional_currency TEXT NOT NULL,
reporting_currency TEXT NOT NULL,
jurisdiction TEXT NOT NULL,
tax_id TEXT,
fiscal_year_end_month INTEGER NOT NULL,
fiscal_year_end_day INTEGER NOT NULL,
parent_company_id TEXT REFERENCES companies(id),
ownership_percentage DECIMAL(5,2),
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_companies_organization ON companies(organization_id);
CREATE INDEX idx_companies_parent ON companies(parent_company_id);
accounts
CREATE TABLE accounts (
id TEXT PRIMARY KEY,
company_id TEXT NOT NULL REFERENCES companies(id),
account_number TEXT NOT NULL,
name TEXT NOT NULL,
account_type TEXT NOT NULL,
category TEXT NOT NULL,
normal_balance TEXT NOT NULL,
parent_account_id TEXT REFERENCES accounts(id),
level INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT true,
allow_manual_entries BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (company_id, account_number)
);
CREATE INDEX idx_accounts_company ON accounts(company_id);
CREATE INDEX idx_accounts_parent ON accounts(parent_account_id);
journal_entries
CREATE TABLE journal_entries (
id TEXT PRIMARY KEY,
company_id TEXT NOT NULL REFERENCES companies(id),
entry_number TEXT NOT NULL,
reference TEXT,
description TEXT NOT NULL,
entry_date DATE NOT NULL,
posting_date DATE,
entry_type TEXT NOT NULL,
status TEXT NOT NULL,
fiscal_period_id TEXT NOT NULL REFERENCES fiscal_periods(id),
currency_code TEXT NOT NULL,
created_by TEXT NOT NULL,
approved_by TEXT,
posted_by TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (company_id, entry_number)
);
CREATE INDEX idx_journal_entries_company ON journal_entries(company_id);
CREATE INDEX idx_journal_entries_date ON journal_entries(entry_date);
CREATE INDEX idx_journal_entries_period ON journal_entries(fiscal_period_id);
CREATE INDEX idx_journal_entries_status ON journal_entries(status);
journal_entry_lines
CREATE TABLE journal_entry_lines (
id TEXT PRIMARY KEY,
journal_entry_id TEXT NOT NULL REFERENCES journal_entries(id) ON DELETE CASCADE,
line_number INTEGER NOT NULL,
account_id TEXT NOT NULL REFERENCES accounts(id),
debit DECIMAL(19,4) NOT NULL DEFAULT 0,
credit DECIMAL(19,4) NOT NULL DEFAULT 0,
currency_code TEXT NOT NULL,
exchange_rate DECIMAL(19,10),
functional_currency_debit DECIMAL(19,4) NOT NULL DEFAULT 0,
functional_currency_credit DECIMAL(19,4) NOT NULL DEFAULT 0,
description TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (journal_entry_id, line_number),
CHECK (debit >= 0 AND credit >= 0),
CHECK (NOT (debit > 0 AND credit > 0))
);
CREATE INDEX idx_journal_entry_lines_entry ON journal_entry_lines(journal_entry_id);
CREATE INDEX idx_journal_entry_lines_account ON journal_entry_lines(account_id);
Connection Pooling
// packages/persistence/src/Layers/PgClientLive.ts
import { PgClient } from "@effect/sql-pg"
import { Config, Layer, Duration } from "effect"
export const PgClientLive = PgClient.layer({
url: Config.redacted("DATABASE_URL"),
poolSize: 20,
idleTimeout: Duration.seconds(30),
connectionTimeout: Duration.seconds(10),
statementTimeout: Duration.seconds(60)
})
Error Handling
Repository Errors
export class PersistenceError extends Schema.TaggedError<PersistenceError>()()
"PersistenceError",
{
operation: Schema.String,
cause: Schema.Unknown
}
) {}
export class EntityNotFoundError extends Schema.TaggedError<EntityNotFoundError>()()
"EntityNotFoundError",
{
entity: Schema.String,
id: Schema.String
}
) {
get message(): string {
return `${this.entity} not found: ${this.id}`
}
}
export class DuplicateKeyError extends Schema.TaggedError<DuplicateKeyError>()()
"DuplicateKeyError",
{
entity: Schema.String,
key: Schema.String
}
) {
get message(): string {
return `${this.entity} already exists: ${this.key}`
}
}
Mapping SQL Errors
const create = (account: Account) =>
sql`INSERT INTO accounts ${sql.insert(account)} RETURNING *`.pipe(
SqlSchema.single({ Result: AccountRow }),
Effect.map(rowToAccount),
Effect.mapError((error) => {
if (error.message.includes("duplicate key")) {
return new DuplicateKeyError({
entity: "Account",
key: account.accountNumber
})
}
return new PersistenceError({
operation: "create account",
cause: error
})
})
)
Testing with Testcontainers
import { PostgreSqlContainer } from "@testcontainers/postgresql"
import { inject } from "vitest"
// vitest.global-setup.ts
export async function setup({ provide }) {
const container = await new PostgreSqlContainer("postgres:alpine").start()
provide("dbUrl", container.getConnectionUri())
}
export async function teardown() {
await container?.stop()
}
// test/utils.ts
export const SharedPgClientLive = Layer.effect(
PgClient.PgClient,
Effect.gen(function*() {
const url = inject("dbUrl") as string
return yield* PgClient.make({ url: Redacted.make(url) })
})
)
// account.test.ts
const TestLayer = Layer.mergeAll(
AccountRepositoryLive,
MigrationsLive
).pipe(
Layer.provideMerge(SharedPgClientLive)
)
it.layer(TestLayer)("AccountRepository", (it) => {
it.effect("creates and retrieves account", () =>
Effect.gen(function*() {
const repo = yield* AccountRepository
const account = yield* repo.create(testAccount)
const retrieved = yield* repo.getById(orgId, account.id)
expect(retrieved.name).toBe(testAccount.name)
})
)
})
Performance Optimization
Indexing Strategy
-- Foreign keys (always index)
CREATE INDEX idx_accounts_company ON accounts(company_id);
-- Common WHERE clauses
CREATE INDEX idx_journal_entries_date ON journal_entries(entry_date);
CREATE INDEX idx_journal_entries_status ON journal_entries(status);
-- Composite indexes for common queries
CREATE INDEX idx_journal_entries_company_date
ON journal_entries(company_id, entry_date);
-- Partial indexes for filtered queries
CREATE INDEX idx_accounts_active
ON accounts(company_id)
WHERE is_active = true;
Query Optimization
// Use SELECT * sparingly - specify columns
const findById = sql`
SELECT id, name, account_type, is_active
FROM accounts
WHERE id = ${id}
`
// Use LIMIT for large result sets
const findRecent = sql`
SELECT * FROM journal_entries
WHERE company_id = ${companyId}
ORDER BY entry_date DESC
LIMIT 100
`
// Use EXISTS for existence checks
const exists = sql`
SELECT EXISTS(
SELECT 1 FROM accounts WHERE id = ${id}
)
`
Next Steps
- Effect Framework - Effect patterns in repositories
- Domain Model - Entities being persisted
- Testing - Testing database code
- Error Handling - Error patterns