Skip to main content
The @effect/sql-pg package provides a PostgreSQL client built on the pg library with full Effect integration.

Installation

npm install @effect/sql-pg pg

Quick Start

import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Sql from "effect/unstable/sql"
import * as Redacted from "effect/Redacted"

const PgLive = PgClient.layer({
  host: "localhost",
  port: 5432,
  database: "mydb",
  username: "postgres",
  password: Redacted.make("password")
})

const program = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  const users = yield* sql`SELECT * FROM users`
  return users
}).pipe(
  Effect.provide(PgLive)
)

Effect.runPromise(program).then(console.log)

Configuration

Connection Options

import { PgClient } from "@effect/sql-pg"
import * as Duration from "effect/Duration"
import * as Redacted from "effect/Redacted"

const PgLive = PgClient.layer({
  // Connection details
  host: "localhost",
  port: 5432,
  database: "mydb",
  username: "postgres",
  password: Redacted.make("password"),
  
  // Or use a connection URL
  url: Redacted.make("postgresql://user:pass@localhost:5432/mydb"),
  
  // SSL configuration
  ssl: {
    rejectUnauthorized: false
  },
  
  // Connection pool settings
  maxConnections: 10,
  minConnections: 2,
  connectionTTL: Duration.minutes(5),
  idleTimeout: Duration.seconds(30),
  connectTimeout: Duration.seconds(5),
  
  // Application identification
  applicationName: "my-app",
  
  // Name transformation
  transformResultNames: (str) => str, // camelCase, etc.
  transformQueryNames: (str) => str,
  
  // JSON handling
  transformJson: true,
  
  // Custom type parsing
  types: {
    // Override pg type parsers
  }
})

Connection from Config

Load configuration from Effect Config:
import { PgClient } from "@effect/sql-pg"
import * as Config from "effect/Config"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"

const PgLive = PgClient.layerConfig({
  // Reads from POSTGRES_HOST, POSTGRES_PORT, etc.
  host: Config.string("POSTGRES_HOST"),
  port: Config.number("POSTGRES_PORT"),
  database: Config.string("POSTGRES_DATABASE"),
  username: Config.string("POSTGRES_USERNAME"),
  password: Config.redacted("POSTGRES_PASSWORD")
})

Querying

Basic Queries

import * as Effect from "effect/Effect"
import * as Sql from "effect/unstable/sql"

const getAllUsers = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  return yield* sql`SELECT * FROM users`
})

const getUserById = (id: number) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    const users = yield* sql`SELECT * FROM users WHERE id = ${id}`
    return users[0]
  })

const insertUser = (name: string, email: string) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    yield* sql`
      INSERT INTO users (name, email)
      VALUES (${name}, ${email})
    `
  })

Returning Inserted Data

const createUser = (name: string, email: string) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    const users = yield* sql`
      INSERT INTO users (name, email)
      VALUES (${name}, ${email})
      RETURNING *
    `
    return users[0]
  })

Batch Operations

const insertMany = (users: Array<{ name: string; email: string }>) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    yield* sql`INSERT INTO users ${sql.insert(users)}`
  })

Transactions

Basic Transaction

const transferFunds = (from: number, to: number, amount: number) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    
    yield* sql`
      UPDATE accounts 
      SET balance = balance - ${amount} 
      WHERE id = ${from}
    `
    
    yield* sql`
      UPDATE accounts 
      SET balance = balance + ${amount} 
      WHERE id = ${to}
    `
  }).pipe(
    sql.withTransaction
  )

Nested Transactions (Savepoints)

const complexOperation = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  
  yield* sql`INSERT INTO logs (message) VALUES ('start')`
  
  yield* Effect.gen(function* () {
    yield* sql`INSERT INTO temp_data (value) VALUES (1)`
    
    // This might fail, but won't roll back the outer transaction
    yield* sql`INSERT INTO temp_data (value) VALUES (2)`
  }).pipe(
    sql.withTransaction,
    Effect.catchAll(() => Effect.succeed(void 0))
  )
  
  yield* sql`INSERT INTO logs (message) VALUES ('complete')`
}).pipe(
  sql.withTransaction
)

PostgreSQL-Specific Features

JSON/JSONB Support

import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"

const program = Effect.gen(function* () {
  const pg = yield* PgClient
  
  // Insert JSON data
  yield* pg`
    INSERT INTO documents (data)
    VALUES (${pg.json({ name: "John", age: 30 })})
  `
  
  // Query JSON fields
  const docs = yield* pg`
    SELECT * FROM documents
    WHERE data->>'name' = 'John'
  `
  
  return docs
})

LISTEN/NOTIFY

import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"

// Listen for notifications
const listenForUpdates = Effect.gen(function* () {
  const pg = yield* PgClient
  
  return pg.listen("user_updates").pipe(
    Stream.runForEach((payload) =>
      Effect.sync(() => console.log("Notification:", payload))
    )
  )
})

// Send notification
const notifyUpdate = (userId: number) =>
  Effect.gen(function* () {
    const pg = yield* PgClient
    yield* pg.notify("user_updates", JSON.stringify({ userId }))
  })

Arrays

const findUsersWithTags = (tags: string[]) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    return yield* sql`
      SELECT * FROM users
      WHERE tags && ${tags}::text[]
    `
  })

Custom Types

import * as Pg from "pg"

const PgLive = PgClient.layer({
  host: "localhost",
  types: {
    getTypeParser: Pg.types.getTypeParser,
    setTypeParser: (oid, parser) => {
      // Custom type parsing
      Pg.types.setTypeParser(oid, parser)
    }
  }
})

Streaming Results

For large result sets, use cursors:
import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"

const streamAllUsers = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  
  return sql.stream`SELECT * FROM users`.pipe(
    Stream.runForEach((user) =>
      Effect.sync(() => console.log(user))
    )
  )
})

Migrations

import { PgMigrator } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"

const MigratorLive = PgMigrator.layer({
  loader: PgMigrator.fromFileSystem("./migrations"),
  schemaDirectory: "sql/migrations"
})

const migrate = Effect.gen(function* () {
  const migrator = yield* PgMigrator
  yield* migrator.run()
}).pipe(
  Effect.provide(Layer.merge(PgLive, MigratorLive))
)
Create migration files in ./migrations/:
-- 001_create_users.sql
CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  email TEXT UNIQUE NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

Schema Definition

import * as SqlSchema from "effect/unstable/sql/SqlSchema"
import * as Schema from "effect/Schema"

class User extends SqlSchema.make({
  id: Schema.Int.pipe(Schema.primaryKey),
  name: Schema.String,
  email: Schema.String,
  createdAt: SqlSchema.DateTimeInsert,
  updatedAt: SqlSchema.DateTimeUpdate
}) {}

// Type-safe insert
const createUser = (user: typeof User.jsonInsert.Type) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    yield* sql`INSERT INTO users ${sql.insert(user)}`
  })

// Type-safe select
const getUsers = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  return yield* sql`SELECT * FROM users`.pipe(
    Effect.map(Schema.decodeUnknown(Schema.Array(User)))
  )
})

Reactive Queries

Automatically re-execute queries when data changes:
import * as Stream from "effect/Stream"

const watchUser = (id: number) =>
  Effect.gen(function* () {
    const sql = yield* Sql.SqlClient
    
    return sql.reactive(
      ["users", id],
      sql`SELECT * FROM users WHERE id = ${id}`
    )
  })

// Use the stream
const program = Effect.gen(function* () {
  const userStream = yield* watchUser(1)
  
  yield* userStream.pipe(
    Stream.runForEach((user) =>
      Effect.sync(() => console.log("User updated:", user))
    )
  )
})

Error Handling

import { SqlError } from "effect/unstable/sql/SqlError"
import * as Effect from "effect/Effect"

const safeQuery = Effect.gen(function* () {
  const sql = yield* Sql.SqlClient
  return yield* sql`SELECT * FROM users`
}).pipe(
  Effect.catchTag("SqlError", (error) => {
    console.error("Database error:", error.message)
    return Effect.succeed([])
  })
)

Best Practices

  1. Use connection pooling: Configure appropriate pool sizes
  2. Parameterize queries: Always use template literals for safety
  3. Handle errors: Wrap queries in proper error handling
  4. Use transactions: For related operations that must succeed/fail together
  5. Close connections: Effect handles cleanup automatically with scopes
  6. Monitor performance: Use the built-in tracing support

Requirements

  • PostgreSQL 9.6 or higher
  • Node.js 18+ (for @effect/platform-node)
  • Effect 4.0.0 or higher

Next Steps

Build docs developers (and LLMs) love