@effect/sql-pg package provides a PostgreSQL client built on the pg library with full Effect integration.
Installation
npm install @effect/sql-pg pg
Quick Start
import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Sql from "effect/unstable/sql"
import * as Redacted from "effect/Redacted"
const PgLive = PgClient.layer({
host: "localhost",
port: 5432,
database: "mydb",
username: "postgres",
password: Redacted.make("password")
})
const program = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
const users = yield* sql`SELECT * FROM users`
return users
}).pipe(
Effect.provide(PgLive)
)
Effect.runPromise(program).then(console.log)
Configuration
Connection Options
import { PgClient } from "@effect/sql-pg"
import * as Duration from "effect/Duration"
import * as Redacted from "effect/Redacted"
const PgLive = PgClient.layer({
// Connection details
host: "localhost",
port: 5432,
database: "mydb",
username: "postgres",
password: Redacted.make("password"),
// Or use a connection URL
url: Redacted.make("postgresql://user:pass@localhost:5432/mydb"),
// SSL configuration
ssl: {
rejectUnauthorized: false
},
// Connection pool settings
maxConnections: 10,
minConnections: 2,
connectionTTL: Duration.minutes(5),
idleTimeout: Duration.seconds(30),
connectTimeout: Duration.seconds(5),
// Application identification
applicationName: "my-app",
// Name transformation
transformResultNames: (str) => str, // camelCase, etc.
transformQueryNames: (str) => str,
// JSON handling
transformJson: true,
// Custom type parsing
types: {
// Override pg type parsers
}
})
Connection from Config
Load configuration from Effect Config:import { PgClient } from "@effect/sql-pg"
import * as Config from "effect/Config"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
const PgLive = PgClient.layerConfig({
// Reads from POSTGRES_HOST, POSTGRES_PORT, etc.
host: Config.string("POSTGRES_HOST"),
port: Config.number("POSTGRES_PORT"),
database: Config.string("POSTGRES_DATABASE"),
username: Config.string("POSTGRES_USERNAME"),
password: Config.redacted("POSTGRES_PASSWORD")
})
Querying
Basic Queries
import * as Effect from "effect/Effect"
import * as Sql from "effect/unstable/sql"
const getAllUsers = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return yield* sql`SELECT * FROM users`
})
const getUserById = (id: number) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
const users = yield* sql`SELECT * FROM users WHERE id = ${id}`
return users[0]
})
const insertUser = (name: string, email: string) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
yield* sql`
INSERT INTO users (name, email)
VALUES (${name}, ${email})
`
})
Returning Inserted Data
const createUser = (name: string, email: string) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
const users = yield* sql`
INSERT INTO users (name, email)
VALUES (${name}, ${email})
RETURNING *
`
return users[0]
})
Batch Operations
const insertMany = (users: Array<{ name: string; email: string }>) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
yield* sql`INSERT INTO users ${sql.insert(users)}`
})
Transactions
Basic Transaction
const transferFunds = (from: number, to: number, amount: number) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
yield* sql`
UPDATE accounts
SET balance = balance - ${amount}
WHERE id = ${from}
`
yield* sql`
UPDATE accounts
SET balance = balance + ${amount}
WHERE id = ${to}
`
}).pipe(
sql.withTransaction
)
Nested Transactions (Savepoints)
const complexOperation = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
yield* sql`INSERT INTO logs (message) VALUES ('start')`
yield* Effect.gen(function* () {
yield* sql`INSERT INTO temp_data (value) VALUES (1)`
// This might fail, but won't roll back the outer transaction
yield* sql`INSERT INTO temp_data (value) VALUES (2)`
}).pipe(
sql.withTransaction,
Effect.catchAll(() => Effect.succeed(void 0))
)
yield* sql`INSERT INTO logs (message) VALUES ('complete')`
}).pipe(
sql.withTransaction
)
PostgreSQL-Specific Features
JSON/JSONB Support
import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
const program = Effect.gen(function* () {
const pg = yield* PgClient
// Insert JSON data
yield* pg`
INSERT INTO documents (data)
VALUES (${pg.json({ name: "John", age: 30 })})
`
// Query JSON fields
const docs = yield* pg`
SELECT * FROM documents
WHERE data->>'name' = 'John'
`
return docs
})
LISTEN/NOTIFY
import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"
// Listen for notifications
const listenForUpdates = Effect.gen(function* () {
const pg = yield* PgClient
return pg.listen("user_updates").pipe(
Stream.runForEach((payload) =>
Effect.sync(() => console.log("Notification:", payload))
)
)
})
// Send notification
const notifyUpdate = (userId: number) =>
Effect.gen(function* () {
const pg = yield* PgClient
yield* pg.notify("user_updates", JSON.stringify({ userId }))
})
Arrays
const findUsersWithTags = (tags: string[]) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return yield* sql`
SELECT * FROM users
WHERE tags && ${tags}::text[]
`
})
Custom Types
import * as Pg from "pg"
const PgLive = PgClient.layer({
host: "localhost",
types: {
getTypeParser: Pg.types.getTypeParser,
setTypeParser: (oid, parser) => {
// Custom type parsing
Pg.types.setTypeParser(oid, parser)
}
}
})
Streaming Results
For large result sets, use cursors:import { PgClient } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Stream from "effect/Stream"
const streamAllUsers = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return sql.stream`SELECT * FROM users`.pipe(
Stream.runForEach((user) =>
Effect.sync(() => console.log(user))
)
)
})
Migrations
import { PgMigrator } from "@effect/sql-pg"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
const MigratorLive = PgMigrator.layer({
loader: PgMigrator.fromFileSystem("./migrations"),
schemaDirectory: "sql/migrations"
})
const migrate = Effect.gen(function* () {
const migrator = yield* PgMigrator
yield* migrator.run()
}).pipe(
Effect.provide(Layer.merge(PgLive, MigratorLive))
)
./migrations/:
-- 001_create_users.sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Schema Definition
import * as SqlSchema from "effect/unstable/sql/SqlSchema"
import * as Schema from "effect/Schema"
class User extends SqlSchema.make({
id: Schema.Int.pipe(Schema.primaryKey),
name: Schema.String,
email: Schema.String,
createdAt: SqlSchema.DateTimeInsert,
updatedAt: SqlSchema.DateTimeUpdate
}) {}
// Type-safe insert
const createUser = (user: typeof User.jsonInsert.Type) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
yield* sql`INSERT INTO users ${sql.insert(user)}`
})
// Type-safe select
const getUsers = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return yield* sql`SELECT * FROM users`.pipe(
Effect.map(Schema.decodeUnknown(Schema.Array(User)))
)
})
Reactive Queries
Automatically re-execute queries when data changes:import * as Stream from "effect/Stream"
const watchUser = (id: number) =>
Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return sql.reactive(
["users", id],
sql`SELECT * FROM users WHERE id = ${id}`
)
})
// Use the stream
const program = Effect.gen(function* () {
const userStream = yield* watchUser(1)
yield* userStream.pipe(
Stream.runForEach((user) =>
Effect.sync(() => console.log("User updated:", user))
)
)
})
Error Handling
import { SqlError } from "effect/unstable/sql/SqlError"
import * as Effect from "effect/Effect"
const safeQuery = Effect.gen(function* () {
const sql = yield* Sql.SqlClient
return yield* sql`SELECT * FROM users`
}).pipe(
Effect.catchTag("SqlError", (error) => {
console.error("Database error:", error.message)
return Effect.succeed([])
})
)
Best Practices
- Use connection pooling: Configure appropriate pool sizes
- Parameterize queries: Always use template literals for safety
- Handle errors: Wrap queries in proper error handling
- Use transactions: For related operations that must succeed/fail together
- Close connections: Effect handles cleanup automatically with scopes
- Monitor performance: Use the built-in tracing support
Requirements
- PostgreSQL 9.6 or higher
- Node.js 18+ (for
@effect/platform-node) - Effect 4.0.0 or higher