Stream database mutations with cursor-based subscriptions and consumer groups
Kora’s Change Data Capture (CDC) system records every mutation in a per-shard ring buffer, enabling downstream consumers to subscribe to real-time change streams with at-least-once delivery guarantees.
Every write operation in a Kōra shard is recorded as a CdcEvent in a fixed-size, per-shard ring buffer. Downstream consumers read these events through cursor-tracked subscriptions or through consumer groups that provide at-least-once delivery with acknowledgement tracking.
pub enum CdcOp { Set, // A string key was created or overwritten Del, // A key was explicitly deleted Expire, // A key was removed by TTL expiration HSet, // A field was set inside a hash LPush, // A value was pushed to the head of a list RPush, // A value was pushed to the tail of a list SAdd, // A member was added to a set FlushDb, // The entire database was flushed}
# First pollresult = CDCPOLL 0 100last_seq = result[-1][0] # Get last sequence number# Next poll (continue from where we left off)CDCPOLL (last_seq + 1) 100
If the cursor is behind the ring buffer’s start_seq, you’ll receive events starting from the earliest available sequence. Check for gaps by comparing returned sequence numbers.
# Create group starting at sequence 0CDC.GROUP CREATE stream:mutations mygroup 0# Create group starting at latestCDC.GROUP CREATE stream:mutations backfill $
From kora-cdc/src/subscription.rs, subscriptions support glob-style patterns:
# Subscribe to all user keyspattern = "user:*"# Subscribe to specific shardpattern = "shard:0:*"# Subscribe to session keys in a namespacepattern = "app:session:*"
Pattern matching uses byte-level glob matching with * and ? wildcards. Filtering happens after events are retrieved from the ring buffer.
pub struct CdcEvent { /// Unique, monotonically increasing sequence number pub seq: u64, /// Wall-clock timestamp in milliseconds pub timestamp_ms: u64, /// The kind of mutation that occurred pub op: CdcOp, /// The affected key (empty for FlushDb) pub key: Vec<u8>, /// The new value, when applicable pub value: Option<Vec<u8>>,}
result = CDCPOLL cursor 100if result.gap: # Detected missed events log.warn(f"CDC gap detected, missed events before seq {result.next_seq}") # Re-sync from snapshot or accept data loss