Skip to main content

Overview

The persistence layer (packages/persistence) provides type-safe database access using @effect/sql with PostgreSQL. All database operations return Effects with typed errors.
The persistence layer depends on packages/core for domain types but has no knowledge of HTTP, API, or frontend concerns.

Architecture

Repository (interface)

RepositoryLive (implementation)

@effect/sql (SQL client)

PostgreSQL

Database Stack

PostgreSQL

Version: 14+ (recommended 16+) Why PostgreSQL:
  • ACID compliance for financial data
  • JSON/JSONB support for flexible schemas
  • Advanced features (CTEs, window functions, array types)
  • Excellent performance for OLTP workloads
  • Row-level security for multi-tenancy

@effect/sql

Purpose: Type-safe SQL query builder Key Features:
  • Template literal queries (write SQL naturally)
  • Schema validation for results
  • Transaction support with automatic rollback
  • Connection pooling
  • Prepared statement caching
Basic Query:
import { SqlClient } from "@effect/sql"
import { SqlSchema } from "@effect/sql"

const findById = (id: AccountId) =>
  Effect.gen(function* () {
    const sql = yield* SqlClient.SqlClient
    
    const rows = yield* sql`
      SELECT id, account_number, name, account_type
      FROM accounts
      WHERE id = ${id}
    `
    
    return rows[0]
  })

Repository Pattern

Interface Definition

// packages/persistence/src/Services/AccountRepository.ts
import * as Context from "effect/Context"
import type * as Effect from "effect/Effect"
import type * as Option from "effect/Option"
import type { Account, AccountId } from "@accountability/core/accounting/Account"
import type { CompanyId } from "@accountability/core/company/Company"
import type { OrganizationId } from "@accountability/core/organization/Organization"
import type { PersistenceError, EntityNotFoundError } from "../Errors/RepositoryError"

export interface AccountRepository {
  readonly findById: (
    organizationId: OrganizationId,
    id: AccountId
  ) => Effect.Effect<Option.Option<Account>, PersistenceError>
  
  readonly findByCompany: (
    organizationId: OrganizationId,
    companyId: CompanyId
  ) => Effect.Effect<ReadonlyArray<Account>, PersistenceError>
  
  readonly create: (
    account: Account
  ) => Effect.Effect<Account, PersistenceError>
  
  readonly update: (
    organizationId: OrganizationId,
    account: Account
  ) => Effect.Effect<Account, EntityNotFoundError | PersistenceError>
  
  readonly getById: (
    organizationId: OrganizationId,
    id: AccountId
  ) => Effect.Effect<Account, EntityNotFoundError | PersistenceError>
}

export class AccountRepository extends Context.Tag("AccountRepository")<
  AccountRepository,
  AccountRepository
>() {}

Implementation

// packages/persistence/src/Layers/AccountRepositoryLive.ts
import { Layer, Effect } from "effect"
import { SqlClient } from "@effect/sql"
import { SqlSchema } from "@effect/sql"
import * as Schema from "effect/Schema"
import { AccountRepository } from "../Services/AccountRepository"
import { Account } from "@accountability/core/accounting/Account"
import { PersistenceError, EntityNotFoundError } from "../Errors/RepositoryError"

const AccountRow = Schema.Struct({
  id: AccountId,
  company_id: CompanyId,
  account_number: AccountNumber,
  name: Schema.String,
  account_type: AccountType,
  category: AccountCategory,
  normal_balance: NormalBalance,
  parent_account_id: Schema.NullOr(AccountId),
  level: Schema.Number,
  is_active: Schema.Boolean,
  allow_manual_entries: Schema.Boolean,
  created_at: Schema.DateFromString,
  updated_at: Schema.DateFromString
})

const rowToAccount = (row: typeof AccountRow.Type): Account =>
  Account.make({
    id: row.id,
    companyId: row.company_id,
    accountNumber: row.account_number,
    name: row.name,
    accountType: row.account_type,
    category: row.category,
    normalBalance: row.normal_balance,
    parentAccountId: Option.fromNullable(row.parent_account_id),
    level: row.level,
    isActive: row.is_active,
    allowManualEntries: row.allow_manual_entries,
    createdAt: Timestamp.fromDate(row.created_at),
    updatedAt: Timestamp.fromDate(row.updated_at)
  })

const make = Effect.gen(function* () {
  const sql = yield* SqlClient.SqlClient
  
  const findById = (
    organizationId: OrganizationId,
    id: AccountId
  ) =>
    SqlSchema.findOne({
      Request: Schema.Struct({ organizationId, id }),
      Result: AccountRow,
      execute: ({ organizationId, id }) => sql`
        SELECT a.*
        FROM accounts a
        JOIN companies c ON c.id = a.company_id
        WHERE a.id = ${id}
          AND c.organization_id = ${organizationId}
      `
    })({ organizationId, id }).pipe(
      Effect.map(Option.map(rowToAccount))
    )
  
  const findByCompany = (
    organizationId: OrganizationId,
    companyId: CompanyId
  ) =>
    SqlSchema.findAll({
      Request: Schema.Struct({ organizationId, companyId }),
      Result: AccountRow,
      execute: ({ organizationId, companyId }) => sql`
        SELECT a.*
        FROM accounts a
        JOIN companies c ON c.id = a.company_id
        WHERE a.company_id = ${companyId}
          AND c.organization_id = ${organizationId}
        ORDER BY a.account_number
      `
    })({ organizationId, companyId }).pipe(
      Effect.map((rows) => rows.map(rowToAccount))
    )
  
  const create = (account: Account) =>
    sql`
      INSERT INTO accounts ${
        sql.insert({
          id: account.id,
          company_id: account.companyId,
          account_number: account.accountNumber,
          name: account.name,
          account_type: account.accountType,
          category: account.category,
          normal_balance: account.normalBalance,
          parent_account_id: Option.getOrNull(account.parentAccountId),
          level: account.level,
          is_active: account.isActive,
          allow_manual_entries: account.allowManualEntries
        })
      }
      RETURNING *
    `.pipe(
      SqlSchema.single({ Result: AccountRow }),
      Effect.map(rowToAccount)
    )
  
  const update = (
    organizationId: OrganizationId,
    account: Account
  ) =>
    sql`
      UPDATE accounts a
      SET ${
        sql.update({
          name: account.name,
          account_type: account.accountType,
          category: account.category,
          is_active: account.isActive,
          updated_at: new Date()
        })
      }
      FROM companies c
      WHERE a.id = ${account.id}
        AND a.company_id = c.id
        AND c.organization_id = ${organizationId}
      RETURNING a.*
    `.pipe(
      SqlSchema.single({ Result: AccountRow }),
      Effect.map(rowToAccount),
      Effect.mapError(() =>
        new EntityNotFoundError({
          entity: "Account",
          id: account.id
        })
      )
    )
  
  const getById = (
    organizationId: OrganizationId,
    id: AccountId
  ) =>
    findById(organizationId, id).pipe(
      Effect.flatMap(
        Option.match({
          onNone: () => Effect.fail(
            new EntityNotFoundError({ entity: "Account", id })
          ),
          onSome: Effect.succeed
        })
      )
    )
  
  return {
    findById,
    findByCompany,
    create,
    update,
    getById
  }
})

export const AccountRepositoryLive = Layer.effect(AccountRepository, make)

SqlSchema Patterns

findOne - Returns Option<T>

Use when: You expect 0 or 1 result.
const findById = SqlSchema.findOne({
  Request: AccountId,
  Result: AccountRow,
  execute: (id) => sql`
    SELECT * FROM accounts WHERE id = ${id}
  `
})

// Returns: Effect<Option<Account>, SqlError>
const account = yield* findById(accountId)
if (Option.isSome(account)) {
  console.log(account.value.name)
}

findAll - Returns Array<T>

Use when: You expect 0 or more results.
const findByCompany = SqlSchema.findAll({
  Request: CompanyId,
  Result: AccountRow,
  execute: (companyId) => sql`
    SELECT * FROM accounts WHERE company_id = ${companyId}
  `
})

// Returns: Effect<ReadonlyArray<Account>, SqlError>
const accounts = yield* findByCompany(companyId)
console.log(`Found ${accounts.length} accounts`)

single - Expects Exactly 1 Result

Use when: You expect exactly 1 result (fails if 0 or >1).
const getById = SqlSchema.single({
  Request: AccountId,
  Result: AccountRow,
  execute: (id) => sql`
    SELECT * FROM accounts WHERE id = ${id}
  `
})

// Returns: Effect<Account, SqlError | NotFoundError>
const account = yield* getById(accountId)
// Throws if account not found

void - No Result Expected

Use when: INSERT/UPDATE/DELETE with no RETURNING clause.
const deleteById = SqlSchema.void({
  Request: AccountId,
  execute: (id) => sql`
    DELETE FROM accounts WHERE id = ${id}
  `
})

// Returns: Effect<void, SqlError>
yield* deleteById(accountId)

SQL Helpers

sql.insert - Insert Rows

// Single row
sql`INSERT INTO accounts ${sql.insert({
  id: "acc_123",
  name: "Cash",
  account_type: "Asset"
})}`

// Multiple rows
sql`INSERT INTO accounts ${sql.insert([
  { id: "acc_1", name: "Cash", account_type: "Asset" },
  { id: "acc_2", name: "Bank", account_type: "Asset" }
])}`

// With RETURNING
sql`
  INSERT INTO accounts ${sql.insert(account)}
  RETURNING *
`

sql.update - Update Statement

sql`
  UPDATE accounts
  SET ${sql.update({
    name: "New Name",
    updated_at: new Date()
  })}
  WHERE id = ${id}
`
// Generates: SET name = $1, updated_at = $2

sql.in - IN Clause

const accountIds = ["acc_1", "acc_2", "acc_3"]

sql`
  SELECT * FROM accounts
  WHERE id IN ${sql.in(accountIds)}
`
// Generates: WHERE id IN ($1, $2, $3)

sql.and - AND Conditions

const conditions = [
  sql`company_id = ${companyId}`,
  sql`is_active = true`,
  sql`account_type = 'Asset'`
]

sql`
  SELECT * FROM accounts
  WHERE ${sql.and(conditions)}
`
// Generates: WHERE company_id = $1 AND is_active = $2 AND account_type = $3

Transactions

sql.withTransaction

Purpose: Run multiple operations atomically.
const createJournalEntry = (
  entry: JournalEntry,
  lines: ReadonlyArray<JournalEntryLine>
) =>
  Effect.gen(function* () {
    const sql = yield* SqlClient.SqlClient
    
    return yield* sql.withTransaction(
      Effect.gen(function* () {
        // Insert journal entry
        yield* sql`
          INSERT INTO journal_entries ${sql.insert(entry)}
        `
        
        // Insert all lines
        yield* sql`
          INSERT INTO journal_entry_lines ${sql.insert(lines)}
        `
        
        // Update account balances
        yield* Effect.forEach(lines, (line) =>
          sql`
            UPDATE account_balances
            SET balance = balance + ${line.debit} - ${line.credit}
            WHERE account_id = ${line.accountId}
          `
        )
        
        return entry
      })
    )
  })
// If any operation fails, entire transaction rolls back

Nested Transactions

const outerTransaction = sql.withTransaction(
  Effect.gen(function* () {
    yield* operation1()
    
    // Nested transaction (uses savepoint)
    yield* sql.withTransaction(
      Effect.gen(function* () {
        yield* operation2()
        yield* operation3()
      })
    )
    
    yield* operation4()
  })
)

Migrations

Migration Structure

// packages/persistence/src/Migrations/Migration0003_CreateAccounts.ts
import { Effect } from "effect"
import { SqlClient } from "@effect/sql"

export const Migration0003_CreateAccounts = Effect.gen(function* () {
  const sql = yield* SqlClient.SqlClient
  
  yield* sql`
    CREATE TABLE IF NOT EXISTS accounts (
      id TEXT PRIMARY KEY,
      company_id TEXT NOT NULL REFERENCES companies(id),
      account_number TEXT NOT NULL,
      name TEXT NOT NULL,
      account_type TEXT NOT NULL CHECK (account_type IN (
        'Asset', 'Liability', 'Equity', 'Revenue', 'Expense'
      )),
      category TEXT NOT NULL,
      normal_balance TEXT NOT NULL CHECK (normal_balance IN ('Debit', 'Credit')),
      parent_account_id TEXT REFERENCES accounts(id),
      level INTEGER NOT NULL DEFAULT 0,
      is_active BOOLEAN NOT NULL DEFAULT true,
      allow_manual_entries BOOLEAN NOT NULL DEFAULT true,
      require_department BOOLEAN NOT NULL DEFAULT false,
      require_project BOOLEAN NOT NULL DEFAULT false,
      currency_code TEXT,
      created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
      updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
      
      UNIQUE (company_id, account_number)
    )
  `
  
  yield* sql`
    CREATE INDEX idx_accounts_company_id ON accounts(company_id)
  `
  
  yield* sql`
    CREATE INDEX idx_accounts_parent_id ON accounts(parent_account_id)
  `
  
  yield* sql`
    CREATE INDEX idx_accounts_type ON accounts(account_type)
  `
})

Running Migrations

// packages/persistence/src/Layers/MigrationsLive.ts
import { Layer, Effect } from "effect"
import { SqlClient } from "@effect/sql"

const runMigrations = Effect.gen(function* () {
  const sql = yield* SqlClient.SqlClient
  
  // Create migrations table
  yield* sql`
    CREATE TABLE IF NOT EXISTS migrations (
      id TEXT PRIMARY KEY,
      applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    )
  `
  
  // Get applied migrations
  const applied = yield* sql`SELECT id FROM migrations`
  const appliedIds = new Set(applied.map(r => r.id))
  
  // Run pending migrations in order
  const migrations = [
    { id: "0001", effect: Migration0001_CreateOrganizations },
    { id: "0002", effect: Migration0002_CreateCompanies },
    { id: "0003", effect: Migration0003_CreateAccounts },
    // ...
  ]
  
  for (const migration of migrations) {
    if (!appliedIds.has(migration.id)) {
      yield* Effect.log(`Running migration ${migration.id}`)
      yield* migration.effect
      yield* sql`INSERT INTO migrations (id) VALUES (${migration.id})`
    }
  }
})

export const MigrationsLive = Layer.effect(
  Migrations,
  Effect.gen(function* () {
    yield* runMigrations
    return {}
  })
)

Database Schema

Key Tables

organizations

CREATE TABLE organizations (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  reporting_currency TEXT NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

companies

CREATE TABLE companies (
  id TEXT PRIMARY KEY,
  organization_id TEXT NOT NULL REFERENCES organizations(id),
  name TEXT NOT NULL,
  legal_name TEXT NOT NULL,
  company_type TEXT NOT NULL,
  functional_currency TEXT NOT NULL,
  reporting_currency TEXT NOT NULL,
  jurisdiction TEXT NOT NULL,
  tax_id TEXT,
  fiscal_year_end_month INTEGER NOT NULL,
  fiscal_year_end_day INTEGER NOT NULL,
  parent_company_id TEXT REFERENCES companies(id),
  ownership_percentage DECIMAL(5,2),
  is_active BOOLEAN NOT NULL DEFAULT true,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_companies_organization ON companies(organization_id);
CREATE INDEX idx_companies_parent ON companies(parent_company_id);

accounts

CREATE TABLE accounts (
  id TEXT PRIMARY KEY,
  company_id TEXT NOT NULL REFERENCES companies(id),
  account_number TEXT NOT NULL,
  name TEXT NOT NULL,
  account_type TEXT NOT NULL,
  category TEXT NOT NULL,
  normal_balance TEXT NOT NULL,
  parent_account_id TEXT REFERENCES accounts(id),
  level INTEGER NOT NULL DEFAULT 0,
  is_active BOOLEAN NOT NULL DEFAULT true,
  allow_manual_entries BOOLEAN NOT NULL DEFAULT true,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (company_id, account_number)
);

CREATE INDEX idx_accounts_company ON accounts(company_id);
CREATE INDEX idx_accounts_parent ON accounts(parent_account_id);

journal_entries

CREATE TABLE journal_entries (
  id TEXT PRIMARY KEY,
  company_id TEXT NOT NULL REFERENCES companies(id),
  entry_number TEXT NOT NULL,
  reference TEXT,
  description TEXT NOT NULL,
  entry_date DATE NOT NULL,
  posting_date DATE,
  entry_type TEXT NOT NULL,
  status TEXT NOT NULL,
  fiscal_period_id TEXT NOT NULL REFERENCES fiscal_periods(id),
  currency_code TEXT NOT NULL,
  created_by TEXT NOT NULL,
  approved_by TEXT,
  posted_by TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (company_id, entry_number)
);

CREATE INDEX idx_journal_entries_company ON journal_entries(company_id);
CREATE INDEX idx_journal_entries_date ON journal_entries(entry_date);
CREATE INDEX idx_journal_entries_period ON journal_entries(fiscal_period_id);
CREATE INDEX idx_journal_entries_status ON journal_entries(status);

journal_entry_lines

CREATE TABLE journal_entry_lines (
  id TEXT PRIMARY KEY,
  journal_entry_id TEXT NOT NULL REFERENCES journal_entries(id) ON DELETE CASCADE,
  line_number INTEGER NOT NULL,
  account_id TEXT NOT NULL REFERENCES accounts(id),
  debit DECIMAL(19,4) NOT NULL DEFAULT 0,
  credit DECIMAL(19,4) NOT NULL DEFAULT 0,
  currency_code TEXT NOT NULL,
  exchange_rate DECIMAL(19,10),
  functional_currency_debit DECIMAL(19,4) NOT NULL DEFAULT 0,
  functional_currency_credit DECIMAL(19,4) NOT NULL DEFAULT 0,
  description TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (journal_entry_id, line_number),
  CHECK (debit >= 0 AND credit >= 0),
  CHECK (NOT (debit > 0 AND credit > 0))
);

CREATE INDEX idx_journal_entry_lines_entry ON journal_entry_lines(journal_entry_id);
CREATE INDEX idx_journal_entry_lines_account ON journal_entry_lines(account_id);

Connection Pooling

// packages/persistence/src/Layers/PgClientLive.ts
import { PgClient } from "@effect/sql-pg"
import { Config, Layer, Duration } from "effect"

export const PgClientLive = PgClient.layer({
  url: Config.redacted("DATABASE_URL"),
  poolSize: 20,
  idleTimeout: Duration.seconds(30),
  connectionTimeout: Duration.seconds(10),
  statementTimeout: Duration.seconds(60)
})

Error Handling

Repository Errors

export class PersistenceError extends Schema.TaggedError<PersistenceError>()()
  "PersistenceError",
  {
    operation: Schema.String,
    cause: Schema.Unknown
  }
) {}

export class EntityNotFoundError extends Schema.TaggedError<EntityNotFoundError>()()
  "EntityNotFoundError",
  {
    entity: Schema.String,
    id: Schema.String
  }
) {
  get message(): string {
    return `${this.entity} not found: ${this.id}`
  }
}

export class DuplicateKeyError extends Schema.TaggedError<DuplicateKeyError>()()
  "DuplicateKeyError",
  {
    entity: Schema.String,
    key: Schema.String
  }
) {
  get message(): string {
    return `${this.entity} already exists: ${this.key}`
  }
}

Mapping SQL Errors

const create = (account: Account) =>
  sql`INSERT INTO accounts ${sql.insert(account)} RETURNING *`.pipe(
    SqlSchema.single({ Result: AccountRow }),
    Effect.map(rowToAccount),
    Effect.mapError((error) => {
      if (error.message.includes("duplicate key")) {
        return new DuplicateKeyError({
          entity: "Account",
          key: account.accountNumber
        })
      }
      return new PersistenceError({
        operation: "create account",
        cause: error
      })
    })
  )

Testing with Testcontainers

import { PostgreSqlContainer } from "@testcontainers/postgresql"
import { inject } from "vitest"

// vitest.global-setup.ts
export async function setup({ provide }) {
  const container = await new PostgreSqlContainer("postgres:alpine").start()
  provide("dbUrl", container.getConnectionUri())
}

export async function teardown() {
  await container?.stop()
}

// test/utils.ts
export const SharedPgClientLive = Layer.effect(
  PgClient.PgClient,
  Effect.gen(function*() {
    const url = inject("dbUrl") as string
    return yield* PgClient.make({ url: Redacted.make(url) })
  })
)

// account.test.ts
const TestLayer = Layer.mergeAll(
  AccountRepositoryLive,
  MigrationsLive
).pipe(
  Layer.provideMerge(SharedPgClientLive)
)

it.layer(TestLayer)("AccountRepository", (it) => {
  it.effect("creates and retrieves account", () =>
    Effect.gen(function*() {
      const repo = yield* AccountRepository
      const account = yield* repo.create(testAccount)
      const retrieved = yield* repo.getById(orgId, account.id)
      expect(retrieved.name).toBe(testAccount.name)
    })
  )
})

Performance Optimization

Indexing Strategy

-- Foreign keys (always index)
CREATE INDEX idx_accounts_company ON accounts(company_id);

-- Common WHERE clauses
CREATE INDEX idx_journal_entries_date ON journal_entries(entry_date);
CREATE INDEX idx_journal_entries_status ON journal_entries(status);

-- Composite indexes for common queries
CREATE INDEX idx_journal_entries_company_date 
  ON journal_entries(company_id, entry_date);

-- Partial indexes for filtered queries
CREATE INDEX idx_accounts_active 
  ON accounts(company_id) 
  WHERE is_active = true;

Query Optimization

// Use SELECT * sparingly - specify columns
const findById = sql`
  SELECT id, name, account_type, is_active
  FROM accounts
  WHERE id = ${id}
`

// Use LIMIT for large result sets
const findRecent = sql`
  SELECT * FROM journal_entries
  WHERE company_id = ${companyId}
  ORDER BY entry_date DESC
  LIMIT 100
`

// Use EXISTS for existence checks
const exists = sql`
  SELECT EXISTS(
    SELECT 1 FROM accounts WHERE id = ${id}
  )
`

Next Steps

Build docs developers (and LLMs) love