Skip to main content

Overview

Kora uses a shard-affinity I/O model where each worker thread owns:
  1. A single-threaded Tokio runtime (current_thread)
  2. Its portion of the keyspace (determined by hash-based routing)
  3. Connection I/O for clients accessing local keys
  4. All data structures with zero synchronization (Rc<RefCell<>>)
This eliminates lock contention on the data path and achieves linear scaling with CPU cores.
Inspired by Seastar/ScyllaDB’s thread-per-core model, but implemented in Rust with Tokio for async I/O.

Per-Shard Tokio Runtimes

Worker Thread Creation

Each shard spawns an OS thread with its own current_thread Tokio runtime:
// From kora-server/src/shard_io/mod.rs:333
let handle = thread::Builder::new()
    .name(format!("kora-shard-io-{}", i))
    .spawn(move || {
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .expect("failed to create shard tokio runtime");

        let local = tokio::task::LocalSet::new();
        local.block_on(&rt, async move {
            let store = Rc::new(RefCell::new(ShardStore::new(i as u16)));
            let wal = Rc::new(RefCell::new(wal_writer));
            shard_worker_loop(
                i as u16,
                store,
                wal,
                rx,
                router,
                shared,
                conn_counts,
                shutdown_rx,
                &bind_addr,
                unix_path.as_ref(),
            )
            .await;
        });
    })
    .expect("failed to spawn shard-io thread");
Key details:
  • current_thread runtime: Only the current thread can drive futures (no work-stealing)
  • LocalSet: Required to spawn !Send futures (because Rc<RefCell<>> is not Send)
  • Each shard’s data is completely isolated from other threads

Why current_thread Instead of Multi-Threaded?

Runtime TypeLocking RequiredWork StealingUse Case
multi_threadYes (Arc<Mutex<>>)YesGeneral async workloads with shared state
current_threadNo (Rc<RefCell<>>)NoSingle-threaded event loops with isolated state
Kora uses current_thread because:
  1. Each shard owns its data exclusively — no need for cross-thread synchronization
  2. Eliminates lock contention on the hot path (GET/SET operations)
  3. Predictable latency — no work-stealing means no cross-core cache invalidation

Local vs Foreign Key Routing

Local-Key Commands (Zero Hops)

When a command targets a key owned by the current shard, it executes inline with zero overhead:
// From kora-server/src/shard_io/connection.rs (simplified)
// Connection handler on Shard 0
let command = parse_command(&resp_value)?;
let key = command.key().unwrap();
let target_shard = router.shard_for_key(key);

if target_shard == shard_id {
    // LOCAL: Execute inline (zero hops)
    let response = execute_with_wal_inline(&store, &wal, command);
    send_response_to_client(response);
} else {
    // FOREIGN: Cross-shard hop required
    let rx = router.try_dispatch_foreign(target_shard, command)?;
    let response = rx.await?;
    send_response_to_client(response);
}
Performance: Local-key commands have sub-microsecond latency because:
  • No channel allocation
  • No async await overhead
  • Store access is direct RefCell::borrow_mut()

Foreign-Key Commands (One Hop)

When a command targets a key owned by a different shard, it takes one async hop via unbounded channels:
// From kora-server/src/shard_io/mod.rs:119
pub fn try_dispatch_foreign(
    &self,
    shard_id: u16,
    cmd: Command,
) -> Result<oneshot::Receiver<CommandResponse>, CommandResponse> {
    let (tx, rx) = oneshot::channel();
    self.shard_senders[shard_id as usize]
        .send(ShardRequest::Execute {
            command: cmd,
            queued_at: Instant::now(),
            response_tx: tx,
        })
        .map_err(|_| CommandResponse::Error("ERR shard unavailable".into()))?;
    Ok(rx)
}
Channel types:
  • Command channel: tokio::sync::mpsc::unbounded_channel (per-shard)
  • Response channel: tokio::sync::oneshot::channel (per-request)
Performance: Foreign-key commands add ~1-2μs overhead:
  • One allocation for the oneshot channel
  • One context switch to the target shard’s runtime
  • Target shard executes inline, sends response back

Rc<RefCell<>> vs Arc<Mutex<>>

Why Not Arc<Mutex<>>?

Kora’s data structures use Rc<RefCell<>> instead of Arc<Mutex<>>:
// From kora-server/src/shard_io/mod.rs:343
let store = Rc::new(RefCell::new(ShardStore::new(i as u16)));
let wal = Rc::new(RefCell::new(wal_writer));
Rc<RefCell<>>:
  • ✅ Zero atomic operations (no fetch_add on refcount)
  • ✅ No lock contention (single-threaded access)
  • ✅ Compile-time borrow checking (panics on aliasing violations)
  • ❌ Not Send or Sync (cannot cross thread boundaries)
Arc<Mutex<>>:
  • ❌ Atomic refcount updates on every clone/drop
  • ❌ Lock contention under high concurrency
  • ❌ Potential deadlocks with nested locks
  • ✅ Can be shared across threads
Rc<RefCell<>> is safe here because each shard’s data is accessed only by its own thread. Cross-shard communication uses message passing, not shared memory.

Shared State via Message Passing

State that must be shared across shards uses thread-safe types wrapped in Arc:
// From kora-server/src/shard_io/mod.rs:208
pub(crate) struct AffinitySharedState {
    pub pub_sub: PubSubBroker,              // Thread-safe pub/sub broker
    pub cdc_rings: Vec<Mutex<CdcRing>>,     // Per-shard CDC ring buffers
    pub doc_engine: RwLock<DocEngine>,      // Document database engine
    pub histograms: Arc<CommandHistograms>, // Latency histograms
    pub runtime_config: RwLock<RuntimeConfig>,
    pub router: ShardRouter,                // Routes commands to shards
    // ...
}
These are accessed via:
  • Read-heavy: RwLock for concurrent reads, exclusive writes
  • Write-heavy: Mutex for per-shard serialization
  • Fire-and-forget: mpsc::unbounded_channel for async requests

No Locks on the Data Path

The critical path for GET/SET/INCR operations never acquires a lock:
// From kora-server/src/shard_io/mod.rs:848
pub(crate) fn execute_with_wal_inline(
    store: &Rc<RefCell<ShardStore>>,
    wal_writer: &Rc<RefCell<Option<Box<dyn WalWriter>>>>,
    command: Command,
) -> CommandResponse {
    // WAL write (if mutation)
    {
        let mut wal = wal_writer.borrow_mut(); // RefCell::borrow_mut (no lock)
        if let Some(ref mut writer) = *wal {
            if command.is_mutation() {
                if let Some(record) = command_to_wal_record(&command) {
                    writer.append(&record);
                }
            }
        }
    }
    // Execute command
    store.borrow_mut().execute(command) // RefCell::borrow_mut (no lock)
}
No atomic operations, no syscalls for synchronization — just a runtime borrow check.
Never introduce Arc<Mutex<>> on the data path. If you need shared state, use message passing via tokio::sync::mpsc.

Worker Event Loop

Each shard worker runs a tokio::select! loop handling three event sources:
// From kora-server/src/shard_io/mod.rs:528
loop {
    tokio::select! {
        // 1. New TCP connections (via SO_REUSEPORT)
        result = listener.accept() => {
            let Ok((stream, _addr)) = result else { continue };
            let _ = stream.set_nodelay(true);
            spawn_connection_handler(
                stream, shard_id, &store, &wal_writer, &router, &shared, &conn_counts,
            );
        }
        // 2. Unix socket connections (Shard 0 only)
        result = async {
            match &unix_listener {
                Some(ul) => ul.accept().await.map(|(s, _)| s),
                None => std::future::pending().await,
            }
        } => {
            if let Ok(stream) = result {
                spawn_connection_handler(
                    stream, shard_id, &store, &wal_writer, &router, &shared, &conn_counts,
                );
            }
        }
        // 3. Cross-shard requests (via mpsc channel)
        req = rx.recv() => {
            let Some(req) = req else { break };
            handle_shard_request(
                shard_id,
                &store,
                &wal_writer,
                &shared,
                req,
                &mut ops_since_expire,
            );
        }
        // 4. Shutdown signal
        _ = shutdown.changed() => {
            break;
        }
    }
}

SO_REUSEPORT for Load Balancing

Kora sets SO_REUSEPORT on each shard’s TCP listener, allowing the kernel to distribute incoming connections:
// From kora-server/src/shard_io/mod.rs:436
#[cfg(not(windows))]
socket.set_reuseport(true)?;
How it works:
  • All N shard workers bind to the same IP:PORT
  • Kernel distributes accept() calls across workers using a hash of the connection 5-tuple
  • No dedicated accept thread — each shard handles its own connections
Benefits:
  • Eliminates accept-thread bottleneck
  • Better cache locality (connection data stays on the same core)
  • Automatic load balancing without application logic

Periodic Expiration Sweeps

Each shard periodically sweeps expired keys to avoid memory leaks from forgotten TTLs:
// From kora-server/src/shard_io/mod.rs:866
fn maybe_sweep_inline(store: &Rc<RefCell<ShardStore>>, ops_since_expire: &mut u32) {
    *ops_since_expire += 1;
    if *ops_since_expire >= EXPIRE_SWEEP_INTERVAL_OPS {
        let _ = store
            .borrow_mut()
            .evict_expired_sample(EXPIRE_SWEEP_SAMPLE_SIZE);
        *ops_since_expire = 0;
    }
}

const EXPIRE_SWEEP_INTERVAL_OPS: u32 = 4096;
const EXPIRE_SWEEP_SAMPLE_SIZE: usize = 64;
Strategy:
  • Every 4096 operations, sample 64 random keys
  • Delete any that have expired TTLs
  • Amortizes expiration cost across many operations
Kora uses lazy expiration (check TTL on access) + periodic sampling (bounded sweep). This matches Redis’s hybrid approach.

Cross-Shard Communication Patterns

1. Single-Key Foreign Request

// From kora-server/src/shard_io/dispatch.rs (conceptual)
let (tx, rx) = oneshot::channel();
router.shard_senders[foreign_shard]
    .send(ShardRequest::Execute { command, response_tx: tx })?;
let response = timeout(CROSS_SHARD_TIMEOUT, rx).await??;

2. Multi-Key Fan-Out (MGET)

// From kora-core/src/shard/engine.rs:229
Command::MGet { keys } => {
    let mut shard_requests: Vec<Vec<(usize, Vec<u8>)>> = vec![vec![]; self.shard_count];
    for (i, key) in keys.iter().enumerate() {
        let shard_id = shard_for_key(key, self.shard_count) as usize;
        shard_requests[shard_id].push((i, key.clone()));
    }
    // Send sub-request to each shard
    for (shard_id, shard_keys) in shard_requests.into_iter().enumerate() {
        let (resp_tx, resp_rx) = response_channel();
        self.workers[shard_id].tx.send(ShardMessage::Single {
            command: Command::MGet { keys: shard_keys },
            response_tx: resp_tx,
        });
        receivers.push((indices, resp_rx));
    }
    // Merge responses in original order
}

3. Broadcast (DBSIZE, FLUSHDB)

// From kora-server/src/shard_io/mod.rs:179
pub async fn dispatch_broadcast<F>(&self, cmd_factory: F) -> Vec<CommandResponse>
where
    F: Fn() -> Command,
{
    let mut receivers = Vec::with_capacity(self.shard_count);
    for sender in self.shard_senders.iter() {
        let (tx, rx) = oneshot::channel();
        let _ = sender.send(ShardRequest::Execute {
            command: cmd_factory(),
            queued_at: Instant::now(),
            response_tx: tx,
        });
        receivers.push(rx);
    }
    // Collect all responses
}

Performance Characteristics

Operation TypeLatency OverheadAllocationsLocks
Local-key GET~0 ns (inline)00
Local-key SET~0 ns (inline)WAL buffer grow (rare)0
Foreign-key GET~1-2 μs1 oneshot channel0
MGET (10 keys)~2-5 μsN oneshot channels0
DBSIZE (broadcast)~5-10 μsN oneshot channels0
Measurements from AWS m5.xlarge (4 vCPU, 3.1 GHz) with 4 shard workers.

Next Steps

Sharding

Learn how hash-based key routing works

Architecture

Understand the overall system design

Build docs developers (and LLMs) love