GRDB offers native Combine support for publishing database values and events. Seamlessly integrate database operations into your Combine pipelines.
Quick Start
import Combine
import GRDB
// Observe database values
let publisher = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
let cancellable = publisher.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
print("Error: \(error)")
}
},
receiveValue: { players in
print("Fresh players: \(players)")
}
)
Asynchronous Database Access
Reading from the Database
Publish a single value read from the database:
// DatabasePublishers.Read<[Player]>
let playersPublisher = dbQueue.readPublisher { db in
try Player.fetchAll(db)
}
let cancellable = playersPublisher.sink(
receiveCompletion: { completion in ... },
receiveValue: { players in
print("Players: \(players)")
}
)
The publisher doesn’t access the database until it’s subscribed. It completes on the main queue by default.
Writing to the Database
Publish the result of a write operation:
// DatabasePublishers.Write<Void>
let writePublisher = dbQueue.writePublisher { db in
try Player(name: "Alice", score: 1000).insert(db)
}
// DatabasePublishers.Write<Int>
let countPublisher = dbQueue.writePublisher { db -> Int in
try Player(name: "Bob", score: 500).insert(db)
return try Player.fetchCount(db)
}
let cancellable = countPublisher.sink(
receiveCompletion: { completion in ... },
receiveValue: { count in
print("Player count: \(count)")
}
)
Write-Then-Read Pattern
Optimize writes followed by reads:
let publisher = dbQueue.writePublisher(
updates: { db in
try Player(name: "Charlie", score: 750).insert(db)
},
thenRead: { db, _ in
try Player.fetchAll(db)
}
)
With DatabasePool, the thenRead closure sees the database state after the write but doesn’t block concurrent writes. This reduces contention.
Custom Scheduler
Control which thread receives values:
let backgroundQueue = DispatchQueue(label: "background")
let publisher = dbQueue.readPublisher(
receiveOn: backgroundQueue,
value: { db in try Player.fetchAll(db) }
)
// Values delivered on backgroundQueue
let cancellable = publisher.sink { players in
// Process on background thread
}
Database Observation
ValueObservation Publisher
Track database changes with a Combine publisher:
let observation = ValueObservation.tracking { db in
try Player.fetchAll(db)
}
// Publisher with output [Player] and failure Error
let publisher = observation.publisher(in: dbQueue)
let cancellable = publisher
.sink(
receiveCompletion: { completion in ... },
receiveValue: { players in
print("Fresh players: \(players)")
}
)
Publisher behavior:
- Notifies an initial value before any changes
- May coalesce multiple changes into one notification
- May notify consecutive identical values (use
removeDuplicates())
- Only completes when cancelled (never completes naturally)
- Delivers on the main thread by default
Scheduling Options
// Default: async on main thread
let publisher = observation.publisher(in: dbQueue)
// Immediate notification of initial value
let publisher = observation.publisher(
in: dbQueue,
scheduling: .immediate // Must subscribe from main thread
)
let cancellable = publisher.sink { players in
print("Players: \(players)")
}
// <- "Players" already printed here with .immediate
The .immediate scheduler requires subscription from the main thread. It raises a fatal error otherwise.
SharedValueObservation Publisher
Share a single observation among multiple subscribers:
let sharedObservation = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.shared(in: dbQueue)
let publisher = sharedObservation.publisher()
// Multiple subscribers share the same database observation
let cancellable1 = publisher.sink { players in
updateView1(players)
}
let cancellable2 = publisher.sink { players in
updateView2(players)
}
DatabaseRegionObservation Publisher
Observe transactions that impact a database region:
let observation = DatabaseRegionObservation.tracking(Player.all())
// Publisher with output Database and failure Error
let publisher = observation.publisher(in: dbQueue)
let cancellable = publisher.sink(
receiveCompletion: { completion in ... },
receiveValue: { (db: Database) in
print("Exclusive write access after players changed")
// Perform queries with db
}
)
The publisher delivers database connections in a protected dispatch queue, serialized with all database updates.
Migrations
Migrate databases asynchronously:
let migrator: DatabaseMigrator = /* ... */
let publisher = migrator.migratePublisher(dbQueue)
let cancellable = publisher.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("Migrations complete")
case .failure(let error):
print("Migration failed: \(error)")
}
},
receiveValue: {
print("Migration succeeded")
}
)
Combine Operators
Filter Duplicates
let publisher = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.removeDuplicates()
GRDB also provides observation.removeDuplicates() which is more efficient than the Combine operator.
Map Values
let countPublisher = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.map { $0.count }
.eraseToAnyPublisher()
Debounce Changes
let debouncedPublisher = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
Catch Errors
let safePublisher = dbQueue.readPublisher { db in
try Player.fetchAll(db)
}
.catch { error -> Just<[Player]> in
print("Error: \(error), returning empty array")
return Just([])
}
SwiftUI Integration
With @Published
class PlayerViewModel: ObservableObject {
@Published var players: [Player] = []
private var cancellable: AnyCancellable?
init(dbQueue: DatabaseQueue) {
cancellable = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.replaceError(with: [])
.assign(to: &$players)
}
}
struct PlayerListView: View {
@StateObject var viewModel: PlayerViewModel
var body: some View {
List(viewModel.players) { player in
Text(player.name)
}
}
}
With sink
class PlayerViewModel: ObservableObject {
@Published var players: [Player] = []
private var cancellable: AnyCancellable?
init(dbQueue: DatabaseQueue) {
cancellable = ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.sink(
receiveCompletion: { _ in },
receiveValue: { [weak self] players in
self?.players = players
}
)
}
}
Data Consistency
Don’t combine multiple database publishers with combineLatest or zip. This breaks data consistency because each publisher sees a different database state.
Incorrect Approach
// WRONG: Can produce inconsistent data
let totalCountPublisher = ValueObservation
.tracking { db in try Player.fetchCount(db) }
.publisher(in: dbQueue)
let topPlayersPublisher = ValueObservation
.tracking { db in
try Player.order(Column("score").desc).limit(10).fetchAll(db)
}
.publisher(in: dbQueue)
let combinedPublisher = totalCountPublisher
.combineLatest(topPlayersPublisher)
.map { count, players in
HallOfFame(totalCount: count, topPlayers: players)
}
// Assertion may fail: topPlayers.count may exceed totalCount!
Correct Approach
// CORRECT: Single observation guarantees consistency
struct HallOfFame {
var totalCount: Int
var topPlayers: [Player]
}
let publisher = ValueObservation
.tracking { db -> HallOfFame in
let totalCount = try Player.fetchCount(db)
let topPlayers = try Player
.order(Column("score").desc)
.limit(10)
.fetchAll(db)
return HallOfFame(
totalCount: totalCount,
topPlayers: topPlayers
)
}
.publisher(in: dbQueue)
// Guaranteed: topPlayers.count <= totalCount
Advanced Patterns
Refreshable Publisher
let refreshTrigger = PassthroughSubject<Void, Never>()
let publisher = refreshTrigger
.flatMap { _ in
dbQueue.readPublisher { db in
try Player.fetchAll(db)
}
}
.replaceError(with: [])
// Trigger refresh
refreshTrigger.send()
Batch Operations
let insertPublishers = players.map { player in
dbQueue.writePublisher { db in
try player.insert(db)
}
}
let batchPublisher = Publishers.MergeMany(insertPublishers)
.collect()
.eraseToAnyPublisher()
let cancellable = batchPublisher.sink(
receiveCompletion: { _ in print("All inserted") },
receiveValue: { _ in }
)
Error Recovery
let publisher = dbQueue.writePublisher { db in
try riskyOperation(db)
}
.retry(3)
.catch { error -> Just<DefaultValue> in
logError(error)
return Just(DefaultValue())
}
Testing
In-Memory Database for Tests
final class PlayerTests: XCTestCase {
var dbQueue: DatabaseQueue!
var cancellables: Set<AnyCancellable>!
override func setUp() {
dbQueue = try! DatabaseQueue()
cancellables = []
try! migrator.migrate(dbQueue)
}
func testPlayerObservation() throws {
let expectation = expectation(description: "players")
var receivedPlayers: [Player]?
ValueObservation
.tracking { db in try Player.fetchAll(db) }
.publisher(in: dbQueue)
.sink(
receiveCompletion: { _ in },
receiveValue: { players in
receivedPlayers = players
expectation.fulfill()
}
)
.store(in: &cancellables)
try dbQueue.write { db in
try Player(name: "Test").insert(db)
}
waitForExpectations(timeout: 1)
XCTAssertEqual(receivedPlayers?.count, 1)
}
}
Best Practices
Use appropriate publishers
readPublisher/writePublisher for one-time operations
ValueObservation.publisher for ongoing observation
SharedValueObservation.publisher for multiple subscribers
Handle errors properly
Always provide error handling:.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
// Handle error
}
},
receiveValue: { value in ... }
)
Store cancellables
Keep cancellables alive or operations will stop:class ViewModel {
private var cancellables = Set<AnyCancellable>()
func observe() {
publisher
.sink { ... }
.store(in: &cancellables)
}
}
Maintain data consistency
Use single observations for related data instead of combining multiple publishers.