Skip to main content
Kora’s Change Data Capture (CDC) system records every mutation in a per-shard ring buffer, enabling downstream consumers to subscribe to real-time change streams with at-least-once delivery guarantees.

Overview

From kora-cdc/src/lib.rs:1-17:
Every write operation in a Kōra shard is recorded as a CdcEvent in a fixed-size, per-shard ring buffer. Downstream consumers read these events through cursor-tracked subscriptions or through consumer groups that provide at-least-once delivery with acknowledgement tracking.

Features

  • Per-shard ring buffers with configurable capacity
  • Monotonic sequence numbers for ordered event delivery
  • Cursor-based subscriptions with gap detection
  • Consumer groups for distributed processing
  • Glob-pattern filtering to subscribe to specific keys
  • Pending entry tracking with acknowledgement

Architecture

1

Write Path

Every mutation (SET, DEL, LPUSH, etc.) generates a CdcEvent with a unique sequence number and is appended to the shard’s ring buffer.
2

Ring Buffer

Fixed-capacity circular buffer that overwrites oldest events when full. Consumers detect gaps through sequence number checks.
3

Subscription

Consumers track their position with a cursor (sequence number) and poll for new events in batches.
4

Acknowledgement

Consumer groups track pending entries and require explicit ACKs to advance the group cursor.

Event Types

From kora-cdc/src/ring.rs:8-27:
pub enum CdcOp {
    Set,      // A string key was created or overwritten
    Del,      // A key was explicitly deleted
    Expire,   // A key was removed by TTL expiration
    HSet,     // A field was set inside a hash
    LPush,    // A value was pushed to the head of a list
    RPush,    // A value was pushed to the tail of a list
    SAdd,     // A member was added to a set
    FlushDb,  // The entire database was flushed
}

Simple Polling

CDCPOLL Command

The simplest way to consume CDC events is direct polling:
# Poll from sequence 0, get up to 100 events
CDCPOLL 0 100
Response format:
[
  [
    12345,              // sequence number
    1678901234567,      // timestamp (ms)
    "Set",              // operation type
    "user:1000",        // key
    "{\"name\":\"Alice\"}"  // value (if applicable)
  ],
  [...]
]

Cursor Tracking

# First poll
result = CDCPOLL 0 100
last_seq = result[-1][0]  # Get last sequence number

# Next poll (continue from where we left off)
CDCPOLL (last_seq + 1) 100
If the cursor is behind the ring buffer’s start_seq, you’ll receive events starting from the earliest available sequence. Check for gaps by comparing returned sequence numbers.

Consumer Groups

Consumer groups provide distributed, at-least-once event processing with automatic redelivery.

Create a Group

# Create group starting at sequence 0
CDC.GROUP CREATE stream:mutations mygroup 0

# Create group starting at latest
CDC.GROUP CREATE stream:mutations backfill $
key
string
required
Stream key name (arbitrary identifier)
group
string
required
Consumer group name
start_seq
integer
required
Starting sequence number, or $ for latest

Read from Group

# Consumer 'worker-1' reads 10 events
CDC.GROUP READ stream:mutations mygroup worker-1 10
Events returned are marked as pending until acknowledged.

Acknowledge Events

# Acknowledge specific sequences
CDC.ACK stream:mutations mygroup 12345 12346 12347
# => 3 (number acknowledged)

Check Pending

# List pending entries
CDC.PENDING stream:mutations mygroup
Response:
[
  [12348, "worker-1", 30000],  // [seq, consumer, idle_time_ms]
  [12349, "worker-1", 30000],
  [12350, "worker-2", 5000]
]

Pattern Filtering

From kora-cdc/src/subscription.rs, subscriptions support glob-style patterns:
# Subscribe to all user keys
pattern = "user:*"

# Subscribe to specific shard
pattern = "shard:0:*"

# Subscribe to session keys in a namespace
pattern = "app:session:*"
Pattern matching uses byte-level glob matching with * and ? wildcards. Filtering happens after events are retrieved from the ring buffer.

Event Structure

From kora-cdc/src/ring.rs:34-46:
pub struct CdcEvent {
    /// Unique, monotonically increasing sequence number
    pub seq: u64,
    /// Wall-clock timestamp in milliseconds
    pub timestamp_ms: u64,
    /// The kind of mutation that occurred
    pub op: CdcOp,
    /// The affected key (empty for FlushDb)
    pub key: Vec<u8>,
    /// The new value, when applicable
    pub value: Option<Vec<u8>>,
}

Ring Buffer Internals

From kora-cdc/src/ring.rs:56-104:
pub struct CdcRing {
    buffer: Vec<Option<CdcEvent>>,
    capacity: usize,
    write_seq: u64,   // Next sequence to write
    start_seq: u64,   // Earliest available sequence
}

pub fn push(&mut self, op: CdcOp, key: Vec<u8>, value: Option<Vec<u8>>, timestamp_ms: u64) {
    let seq = self.write_seq;
    let idx = (seq as usize) % self.capacity;
    
    self.buffer[idx] = Some(CdcEvent { seq, timestamp_ms, op, key, value });
    self.write_seq += 1;
    
    // Update start_seq when buffer wraps
    if self.write_seq > self.capacity as u64 {
        self.start_seq = self.write_seq - self.capacity as u64;
    }
}
Ring buffers provide:
  • Bounded memory: Fixed size regardless of event volume
  • O(1) writes: No allocation or compaction
  • Automatic cleanup: Old events are overwritten
  • Gap detection: Consumers know when they fall behind
Trade-off: Slow consumers may miss events. Size the buffer based on expected event rate and consumer lag tolerance.

Gap Detection

From kora-cdc/src/ring.rs:131-149:
pub fn read(&self, from_seq: u64, limit: usize) -> CdcReadResult {
    if from_seq >= self.write_seq {
        return CdcReadResult {
            events: vec![],
            next_seq: self.write_seq,
            gap: false,
        };
    }
    
    let actual_start = if from_seq < self.start_seq {
        self.start_seq  // Consumer fell behind
    } else {
        from_seq
    };
    
    let gap = from_seq < self.start_seq;
    // ...
}
Gap handling:
result = CDCPOLL cursor 100

if result.gap:
    # Detected missed events
    log.warn(f"CDC gap detected, missed events before seq {result.next_seq}")
    # Re-sync from snapshot or accept data loss

Use Cases

Cache Invalidation

Invalidate downstream caches when keys change.
CDC.GROUP CREATE mutations cache-invalidator 0
# Worker invalidates CDN/browser caches

Search Index Sync

Keep search indexes (Elasticsearch, Meilisearch) in sync.
CDC.GROUP CREATE mutations search-indexer 0
# Worker pushes docs to search engine

Audit Logs

Record all mutations for compliance.
CDCPOLL 0 1000
# Archive to S3/BigQuery

Replication

Replicate changes to secondary data stores.
CDC.GROUP CREATE mutations replicator 0
# Worker writes to PostgreSQL/MySQL

Configuration

Ring Buffer Size

Configure per-shard buffer capacity based on write rate and consumer lag:
// In kora-cdc/src/ring.rs
let ring = CdcRing::new(capacity);
Sizing formula:
capacity = write_rate × max_acceptable_lag

Example:
  1000 writes/sec × 60 sec lag tolerance = 60,000 events

Consumer Timeouts

Consumer groups track idle time for pending entries. Redelivery after timeout:
# Check pending with idle time
CDC.PENDING stream:mutations mygroup
# => [[12345, "worker-1", 120000]]  # 120 seconds idle

# Claim stale entries
# (Implementation varies by client library)

Performance Tips

# Good: batch of 1000
CDCPOLL cursor 1000

# Bad: one at a time
CDCPOLL cursor 1
Multiple workers in the same group share the load:
# Worker 1
CDC.GROUP READ stream:mutations mygroup worker-1 100

# Worker 2 (gets different events)
CDC.GROUP READ stream:mutations mygroup worker-2 100
Ring buffer stores all events. Apply pattern/field filters in your consumer:
events = cdc_poll(cursor, 1000)
user_events = [e for e in events if e['key'].startswith(b'user:')]
Track consumer lag:
current_write_seq = ring.write_seq()
consumer_lag = current_write_seq - consumer_cursor

if consumer_lag > threshold:
    alert("CDC consumer falling behind")

Next Steps

Pub/Sub

Push-mode messaging for real-time notifications

Persistence

Configure WAL for durability

Document Database

Stream JSON document changes

API Reference

Complete CDC.* command reference

Build docs developers (and LLMs) love