Skip to main content
GRDB offers native Combine support for publishing database values and events. Seamlessly integrate database operations into your Combine pipelines.

Quick Start

import Combine
import GRDB

// Observe database values
let publisher = ValueObservation
    .tracking { db in try Player.fetchAll(db) }
    .publisher(in: dbQueue)

let cancellable = publisher.sink(
    receiveCompletion: { completion in
        if case .failure(let error) = completion {
            print("Error: \(error)")
        }
    },
    receiveValue: { players in
        print("Fresh players: \(players)")
    }
)

Asynchronous Database Access

Reading from the Database

Publish a single value read from the database:
// DatabasePublishers.Read<[Player]>
let playersPublisher = dbQueue.readPublisher { db in
    try Player.fetchAll(db)
}

let cancellable = playersPublisher.sink(
    receiveCompletion: { completion in ... },
    receiveValue: { players in
        print("Players: \(players)")
    }
)
The publisher doesn’t access the database until it’s subscribed. It completes on the main queue by default.

Writing to the Database

Publish the result of a write operation:
// DatabasePublishers.Write<Void>
let writePublisher = dbQueue.writePublisher { db in
    try Player(name: "Alice", score: 1000).insert(db)
}

// DatabasePublishers.Write<Int>
let countPublisher = dbQueue.writePublisher { db -> Int in
    try Player(name: "Bob", score: 500).insert(db)
    return try Player.fetchCount(db)
}

let cancellable = countPublisher.sink(
    receiveCompletion: { completion in ... },
    receiveValue: { count in
        print("Player count: \(count)")
    }
)

Write-Then-Read Pattern

Optimize writes followed by reads:
let publisher = dbQueue.writePublisher(
    updates: { db in
        try Player(name: "Charlie", score: 750).insert(db)
    },
    thenRead: { db, _ in
        try Player.fetchAll(db)
    }
)
With DatabasePool, the thenRead closure sees the database state after the write but doesn’t block concurrent writes. This reduces contention.

Custom Scheduler

Control which thread receives values:
let backgroundQueue = DispatchQueue(label: "background")

let publisher = dbQueue.readPublisher(
    receiveOn: backgroundQueue,
    value: { db in try Player.fetchAll(db) }
)

// Values delivered on backgroundQueue
let cancellable = publisher.sink { players in
    // Process on background thread
}

Database Observation

ValueObservation Publisher

Track database changes with a Combine publisher:
let observation = ValueObservation.tracking { db in
    try Player.fetchAll(db)
}

// Publisher with output [Player] and failure Error
let publisher = observation.publisher(in: dbQueue)

let cancellable = publisher
    .sink(
        receiveCompletion: { completion in ... },
        receiveValue: { players in
            print("Fresh players: \(players)")
        }
    )
Publisher behavior:
  • Notifies an initial value before any changes
  • May coalesce multiple changes into one notification
  • May notify consecutive identical values (use removeDuplicates())
  • Only completes when cancelled (never completes naturally)
  • Delivers on the main thread by default

Scheduling Options

// Default: async on main thread
let publisher = observation.publisher(in: dbQueue)

// Immediate notification of initial value
let publisher = observation.publisher(
    in: dbQueue,
    scheduling: .immediate  // Must subscribe from main thread
)

let cancellable = publisher.sink { players in
    print("Players: \(players)")
}
// <- "Players" already printed here with .immediate
The .immediate scheduler requires subscription from the main thread. It raises a fatal error otherwise.

SharedValueObservation Publisher

Share a single observation among multiple subscribers:
let sharedObservation = ValueObservation
    .tracking { db in try Player.fetchAll(db) }
    .shared(in: dbQueue)

let publisher = sharedObservation.publisher()

// Multiple subscribers share the same database observation
let cancellable1 = publisher.sink { players in
    updateView1(players)
}

let cancellable2 = publisher.sink { players in
    updateView2(players)
}

DatabaseRegionObservation Publisher

Observe transactions that impact a database region:
let observation = DatabaseRegionObservation.tracking(Player.all())

// Publisher with output Database and failure Error
let publisher = observation.publisher(in: dbQueue)

let cancellable = publisher.sink(
    receiveCompletion: { completion in ... },
    receiveValue: { (db: Database) in
        print("Exclusive write access after players changed")
        // Perform queries with db
    }
)
The publisher delivers database connections in a protected dispatch queue, serialized with all database updates.

Migrations

Migrate databases asynchronously:
let migrator: DatabaseMigrator = /* ... */
let publisher = migrator.migratePublisher(dbQueue)

let cancellable = publisher.sink(
    receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("Migrations complete")
        case .failure(let error):
            print("Migration failed: \(error)")
        }
    },
    receiveValue: {
        print("Migration succeeded")
    }
)

Combine Operators

Filter Duplicates

let publisher = ValueObservation
    .tracking { db in try Player.fetchAll(db) }
    .publisher(in: dbQueue)
    .removeDuplicates()
GRDB also provides observation.removeDuplicates() which is more efficient than the Combine operator.

Map Values

let countPublisher = ValueObservation
    .tracking { db in try Player.fetchAll(db) }
    .publisher(in: dbQueue)
    .map { $0.count }
    .eraseToAnyPublisher()

Debounce Changes

let debouncedPublisher = ValueObservation
    .tracking { db in try Player.fetchAll(db) }
    .publisher(in: dbQueue)
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)

Catch Errors

let safePublisher = dbQueue.readPublisher { db in
    try Player.fetchAll(db)
}
.catch { error -> Just<[Player]> in
    print("Error: \(error), returning empty array")
    return Just([])
}

SwiftUI Integration

With @Published

class PlayerViewModel: ObservableObject {
    @Published var players: [Player] = []
    private var cancellable: AnyCancellable?
    
    init(dbQueue: DatabaseQueue) {
        cancellable = ValueObservation
            .tracking { db in try Player.fetchAll(db) }
            .publisher(in: dbQueue)
            .replaceError(with: [])
            .assign(to: &$players)
    }
}

struct PlayerListView: View {
    @StateObject var viewModel: PlayerViewModel
    
    var body: some View {
        List(viewModel.players) { player in
            Text(player.name)
        }
    }
}

With sink

class PlayerViewModel: ObservableObject {
    @Published var players: [Player] = []
    private var cancellable: AnyCancellable?
    
    init(dbQueue: DatabaseQueue) {
        cancellable = ValueObservation
            .tracking { db in try Player.fetchAll(db) }
            .publisher(in: dbQueue)
            .sink(
                receiveCompletion: { _ in },
                receiveValue: { [weak self] players in
                    self?.players = players
                }
            )
    }
}

Data Consistency

Don’t combine multiple database publishers with combineLatest or zip. This breaks data consistency because each publisher sees a different database state.

Incorrect Approach

// WRONG: Can produce inconsistent data
let totalCountPublisher = ValueObservation
    .tracking { db in try Player.fetchCount(db) }
    .publisher(in: dbQueue)

let topPlayersPublisher = ValueObservation
    .tracking { db in
        try Player.order(Column("score").desc).limit(10).fetchAll(db)
    }
    .publisher(in: dbQueue)

let combinedPublisher = totalCountPublisher
    .combineLatest(topPlayersPublisher)
    .map { count, players in
        HallOfFame(totalCount: count, topPlayers: players)
    }
// Assertion may fail: topPlayers.count may exceed totalCount!

Correct Approach

// CORRECT: Single observation guarantees consistency
struct HallOfFame {
    var totalCount: Int
    var topPlayers: [Player]
}

let publisher = ValueObservation
    .tracking { db -> HallOfFame in
        let totalCount = try Player.fetchCount(db)
        let topPlayers = try Player
            .order(Column("score").desc)
            .limit(10)
            .fetchAll(db)
        
        return HallOfFame(
            totalCount: totalCount,
            topPlayers: topPlayers
        )
    }
    .publisher(in: dbQueue)
// Guaranteed: topPlayers.count <= totalCount

Advanced Patterns

Refreshable Publisher

let refreshTrigger = PassthroughSubject<Void, Never>()

let publisher = refreshTrigger
    .flatMap { _ in
        dbQueue.readPublisher { db in
            try Player.fetchAll(db)
        }
    }
    .replaceError(with: [])

// Trigger refresh
refreshTrigger.send()

Batch Operations

let insertPublishers = players.map { player in
    dbQueue.writePublisher { db in
        try player.insert(db)
    }
}

let batchPublisher = Publishers.MergeMany(insertPublishers)
    .collect()
    .eraseToAnyPublisher()

let cancellable = batchPublisher.sink(
    receiveCompletion: { _ in print("All inserted") },
    receiveValue: { _ in }
)

Error Recovery

let publisher = dbQueue.writePublisher { db in
    try riskyOperation(db)
}
.retry(3)
.catch { error -> Just<DefaultValue> in
    logError(error)
    return Just(DefaultValue())
}

Testing

In-Memory Database for Tests

final class PlayerTests: XCTestCase {
    var dbQueue: DatabaseQueue!
    var cancellables: Set<AnyCancellable>!
    
    override func setUp() {
        dbQueue = try! DatabaseQueue()
        cancellables = []
        try! migrator.migrate(dbQueue)
    }
    
    func testPlayerObservation() throws {
        let expectation = expectation(description: "players")
        var receivedPlayers: [Player]?
        
        ValueObservation
            .tracking { db in try Player.fetchAll(db) }
            .publisher(in: dbQueue)
            .sink(
                receiveCompletion: { _ in },
                receiveValue: { players in
                    receivedPlayers = players
                    expectation.fulfill()
                }
            )
            .store(in: &cancellables)
        
        try dbQueue.write { db in
            try Player(name: "Test").insert(db)
        }
        
        waitForExpectations(timeout: 1)
        XCTAssertEqual(receivedPlayers?.count, 1)
    }
}

Best Practices

1

Use appropriate publishers

  • readPublisher/writePublisher for one-time operations
  • ValueObservation.publisher for ongoing observation
  • SharedValueObservation.publisher for multiple subscribers
2

Handle errors properly

Always provide error handling:
.sink(
    receiveCompletion: { completion in
        if case .failure(let error) = completion {
            // Handle error
        }
    },
    receiveValue: { value in ... }
)
3

Store cancellables

Keep cancellables alive or operations will stop:
class ViewModel {
    private var cancellables = Set<AnyCancellable>()
    
    func observe() {
        publisher
            .sink { ... }
            .store(in: &cancellables)
    }
}
4

Maintain data consistency

Use single observations for related data instead of combining multiple publishers.

Build docs developers (and LLMs) love