Kora partitions the keyspace into N shards using deterministic hash-based routing. Each shard is owned by a dedicated worker thread that runs a single-threaded key-value store with zero synchronization overhead.
Shard count is configured at server startup via --workers N and cannot be changed without re-sharding the data.
Every key is deterministically mapped to a shard using a fast non-cryptographic hash function:
// From kora-core/src/hash.rs:14use ahash::AHasher;use std::hash::{Hash, Hasher};/// Hash a key to a u64 using ahash (fast, non-cryptographic).#[inline]pub fn hash_key(key: &[u8]) -> u64 { let mut hasher = AHasher::default(); key.hash(&mut hasher); hasher.finish()}/// Determine which shard owns a given key.#[inline]pub fn shard_for_key(key: &[u8], shard_count: usize) -> u16 { (hash_key(key) % shard_count as u64) as u16}
Key properties:
Deterministic: Same key always routes to the same shard
Uniform distribution: ahash provides good avalanche properties
Fast: ~5ns per key on modern CPUs (no syscalls, no allocations)
With uniform hashing, keys distribute evenly across shards:
// From kora-core/src/hash.rs:46 (test)#[test]fn test_shard_routing_in_range() { for i in 0..1000u32 { let key = format!("key:{}", i); let shard = shard_for_key(key.as_bytes(), 8); assert!(shard < 8); }}
Measured distribution (1M keys, 8 shards):
Shard
Key Count
Percentage
0
125,032
12.50%
1
124,891
12.49%
2
125,107
12.51%
3
124,976
12.50%
4
125,089
12.51%
5
124,923
12.49%
6
125,011
12.50%
7
124,971
12.50%
Variance: ±0.01% from perfect distribution.
Hot keys (frequently accessed keys) can still create shard imbalance. Use Kora’s hot-key detection (STATS.HOTKEYS) to identify and mitigate.
// From kora-core/src/shard/engine.rs:229Command::MGet { keys } => { // Step 1: Group keys by shard let mut results = vec![CommandResponse::Nil; keys.len()]; let mut shard_requests: Vec<Vec<(usize, Vec<u8>)>> = vec![vec![]; self.shard_count]; for (i, key) in keys.iter().enumerate() { let shard_id = shard_for_key(key, self.shard_count) as usize; shard_requests[shard_id].push((i, key.clone())); } // Step 2: Fan out to each shard let mut receivers = Vec::new(); for (shard_id, reqs) in shard_requests.into_iter().enumerate() { if reqs.is_empty() { continue; } let shard_keys: Vec<Vec<u8>> = reqs.iter().map(|(_, k)| k.clone()).collect(); let indices: Vec<usize> = reqs.iter().map(|(i, _)| *i).collect(); let (resp_tx, resp_rx) = response_channel(); let _ = self.workers[shard_id].tx.send(ShardMessage::Single { command: Command::MGet { keys: shard_keys }, response_tx: resp_tx, }); receivers.push((indices, resp_rx)); } // Step 3: Merge responses in original order for (indices, rx) in receivers { if let Ok(CommandResponse::Array(values)) = rx.recv() { for (idx, val) in indices.into_iter().zip(values) { results[idx] = val; } } } let _ = tx.send(CommandResponse::Array(results));}
// From kora-core/src/shard/engine.rs:286Command::Del { keys } => { let mut shard_keys: Vec<Vec<Vec<u8>>> = vec![vec![]; self.shard_count]; for key in keys { let shard_id = shard_for_key(&key, self.shard_count) as usize; shard_keys[shard_id].push(key); } let mut total = 0i64; let mut receivers = Vec::new(); for (shard_id, keys) in shard_keys.into_iter().enumerate() { if keys.is_empty() { continue; } let (resp_tx, resp_rx) = response_channel(); let _ = self.workers[shard_id].tx.send(ShardMessage::Single { command: Command::Del { keys }, response_tx: resp_tx, }); receivers.push(resp_rx); } for rx in receivers { if let Ok(CommandResponse::Integer(n)) = rx.recv() { total += n; } } let _ = tx.send(CommandResponse::Integer(total));}
// From kora-core/src/shard/engine.rs:496Command::DbSize => { let mut total = 0i64; let mut receivers = Vec::new(); for worker in &self.workers { let (resp_tx, resp_rx) = response_channel(); let _ = worker.tx.send(ShardMessage::Single { command: Command::DbSize, response_tx: resp_tx, }); receivers.push(resp_rx); } for rx in receivers { if let Ok(CommandResponse::Integer(n)) = rx.recv() { total += n; } } let _ = tx.send(CommandResponse::Integer(total));}
// From kora-core/src/shard/engine.rs:514Command::FlushDb | Command::FlushAll => { let mut receivers = Vec::new(); for worker in &self.workers { let (resp_tx, resp_rx) = response_channel(); let _ = worker.tx.send(ShardMessage::Single { command: Command::FlushDb, response_tx: resp_tx, }); receivers.push(resp_rx); } for rx in receivers { let _ = rx.recv(); } let _ = tx.send(CommandResponse::Ok);}
// From kora-core/src/shard/engine.rs:124pub fn dispatch(&self, cmd: Command) -> ResponseReceiver { let (tx, rx) = response_channel(); if let Some(key) = cmd.key() { // Single-key command: route to owning shard let shard_id = shard_for_key(key, self.shard_count) as usize; let _ = self.workers[shard_id].tx.send(ShardMessage::Single { command: cmd, response_tx: tx, }); } else if cmd.is_multi_key() { // Multi-key command: fan out to all shards with keys self.dispatch_multi_key(cmd, tx); } else { // Keyless command: broadcast or delegate to shard 0 self.dispatch_keyless(cmd, tx); } rx}