Overview
Kora uses a shard-affinity I/O model where each worker thread owns:
A single-threaded Tokio runtime (current_thread)
Its portion of the keyspace (determined by hash-based routing)
Connection I/O for clients accessing local keys
All data structures with zero synchronization (Rc<RefCell<>>)
This eliminates lock contention on the data path and achieves linear scaling with CPU cores.
Inspired by Seastar/ScyllaDB’s thread-per-core model, but implemented in Rust with Tokio for async I/O.
Per-Shard Tokio Runtimes
Worker Thread Creation
Each shard spawns an OS thread with its own current_thread Tokio runtime:
// From kora-server/src/shard_io/mod.rs:333
let handle = thread :: Builder :: new ()
. name ( format! ( "kora-shard-io-{}" , i ))
. spawn ( move || {
let rt = tokio :: runtime :: Builder :: new_current_thread ()
. enable_all ()
. build ()
. expect ( "failed to create shard tokio runtime" );
let local = tokio :: task :: LocalSet :: new ();
local . block_on ( & rt , async move {
let store = Rc :: new ( RefCell :: new ( ShardStore :: new ( i as u16 )));
let wal = Rc :: new ( RefCell :: new ( wal_writer ));
shard_worker_loop (
i as u16 ,
store ,
wal ,
rx ,
router ,
shared ,
conn_counts ,
shutdown_rx ,
& bind_addr ,
unix_path . as_ref (),
)
. await ;
});
})
. expect ( "failed to spawn shard-io thread" );
Key details:
current_thread runtime: Only the current thread can drive futures (no work-stealing)
LocalSet: Required to spawn !Send futures (because Rc<RefCell<>> is not Send)
Each shard’s data is completely isolated from other threads
Why current_thread Instead of Multi-Threaded?
Runtime Type Locking Required Work Stealing Use Case multi_threadYes (Arc<Mutex<>>) Yes General async workloads with shared state current_threadNo (Rc<RefCell<>>) No Single-threaded event loops with isolated state
Kora uses current_thread because:
Each shard owns its data exclusively — no need for cross-thread synchronization
Eliminates lock contention on the hot path (GET/SET operations)
Predictable latency — no work-stealing means no cross-core cache invalidation
Local vs Foreign Key Routing
Local-Key Commands (Zero Hops)
When a command targets a key owned by the current shard, it executes inline with zero overhead:
// From kora-server/src/shard_io/connection.rs (simplified)
// Connection handler on Shard 0
let command = parse_command ( & resp_value ) ? ;
let key = command . key () . unwrap ();
let target_shard = router . shard_for_key ( key );
if target_shard == shard_id {
// LOCAL: Execute inline (zero hops)
let response = execute_with_wal_inline ( & store , & wal , command );
send_response_to_client ( response );
} else {
// FOREIGN: Cross-shard hop required
let rx = router . try_dispatch_foreign ( target_shard , command ) ? ;
let response = rx . await ? ;
send_response_to_client ( response );
}
Performance: Local-key commands have sub-microsecond latency because:
No channel allocation
No async await overhead
Store access is direct RefCell::borrow_mut()
Foreign-Key Commands (One Hop)
When a command targets a key owned by a different shard, it takes one async hop via unbounded channels:
// From kora-server/src/shard_io/mod.rs:119
pub fn try_dispatch_foreign (
& self ,
shard_id : u16 ,
cmd : Command ,
) -> Result < oneshot :: Receiver < CommandResponse >, CommandResponse > {
let ( tx , rx ) = oneshot :: channel ();
self . shard_senders[ shard_id as usize ]
. send ( ShardRequest :: Execute {
command : cmd ,
queued_at : Instant :: now (),
response_tx : tx ,
})
. map_err ( | _ | CommandResponse :: Error ( "ERR shard unavailable" . into ())) ? ;
Ok ( rx )
}
Channel types:
Command channel: tokio::sync::mpsc::unbounded_channel (per-shard)
Response channel: tokio::sync::oneshot::channel (per-request)
Performance: Foreign-key commands add ~1-2μs overhead:
One allocation for the oneshot channel
One context switch to the target shard’s runtime
Target shard executes inline, sends response back
Rc<RefCell<>> vs Arc<Mutex<>>
Why Not Arc<Mutex<>>?
Kora’s data structures use Rc<RefCell<>> instead of Arc<Mutex<>>:
// From kora-server/src/shard_io/mod.rs:343
let store = Rc :: new ( RefCell :: new ( ShardStore :: new ( i as u16 )));
let wal = Rc :: new ( RefCell :: new ( wal_writer ));
Rc<RefCell<>>:
✅ Zero atomic operations (no fetch_add on refcount)
✅ No lock contention (single-threaded access)
✅ Compile-time borrow checking (panics on aliasing violations)
❌ Not Send or Sync (cannot cross thread boundaries)
Arc<Mutex<>>:
❌ Atomic refcount updates on every clone/drop
❌ Lock contention under high concurrency
❌ Potential deadlocks with nested locks
✅ Can be shared across threads
Rc<RefCell<>> is safe here because each shard’s data is accessed only by its own thread . Cross-shard communication uses message passing, not shared memory.
Shared State via Message Passing
State that must be shared across shards uses thread-safe types wrapped in Arc:
// From kora-server/src/shard_io/mod.rs:208
pub ( crate ) struct AffinitySharedState {
pub pub_sub : PubSubBroker , // Thread-safe pub/sub broker
pub cdc_rings : Vec < Mutex < CdcRing >>, // Per-shard CDC ring buffers
pub doc_engine : RwLock < DocEngine >, // Document database engine
pub histograms : Arc < CommandHistograms >, // Latency histograms
pub runtime_config : RwLock < RuntimeConfig >,
pub router : ShardRouter , // Routes commands to shards
// ...
}
These are accessed via:
Read-heavy: RwLock for concurrent reads, exclusive writes
Write-heavy: Mutex for per-shard serialization
Fire-and-forget: mpsc::unbounded_channel for async requests
No Locks on the Data Path
The critical path for GET/SET/INCR operations never acquires a lock :
// From kora-server/src/shard_io/mod.rs:848
pub ( crate ) fn execute_with_wal_inline (
store : & Rc < RefCell < ShardStore >>,
wal_writer : & Rc < RefCell < Option < Box < dyn WalWriter >>>>,
command : Command ,
) -> CommandResponse {
// WAL write (if mutation)
{
let mut wal = wal_writer . borrow_mut (); // RefCell::borrow_mut (no lock)
if let Some ( ref mut writer ) = * wal {
if command . is_mutation () {
if let Some ( record ) = command_to_wal_record ( & command ) {
writer . append ( & record );
}
}
}
}
// Execute command
store . borrow_mut () . execute ( command ) // RefCell::borrow_mut (no lock)
}
No atomic operations, no syscalls for synchronization — just a runtime borrow check.
Never introduce Arc<Mutex<>> on the data path. If you need shared state, use message passing via tokio::sync::mpsc.
Worker Event Loop
Each shard worker runs a tokio::select! loop handling three event sources:
// From kora-server/src/shard_io/mod.rs:528
loop {
tokio :: select! {
// 1. New TCP connections (via SO_REUSEPORT)
result = listener . accept () => {
let Ok (( stream , _addr )) = result else { continue };
let _ = stream . set_nodelay ( true );
spawn_connection_handler (
stream , shard_id , & store , & wal_writer , & router , & shared , & conn_counts ,
);
}
// 2. Unix socket connections (Shard 0 only)
result = async {
match & unix_listener {
Some ( ul ) => ul . accept () . await . map ( | ( s , _ ) | s ),
None => std :: future :: pending () . await ,
}
} => {
if let Ok ( stream ) = result {
spawn_connection_handler (
stream , shard_id , & store , & wal_writer , & router , & shared , & conn_counts ,
);
}
}
// 3. Cross-shard requests (via mpsc channel)
req = rx . recv () => {
let Some ( req ) = req else { break };
handle_shard_request (
shard_id ,
& store ,
& wal_writer ,
& shared ,
req ,
& mut ops_since_expire ,
);
}
// 4. Shutdown signal
_ = shutdown . changed () => {
break ;
}
}
}
SO_REUSEPORT for Load Balancing
Kora sets SO_REUSEPORT on each shard’s TCP listener, allowing the kernel to distribute incoming connections:
// From kora-server/src/shard_io/mod.rs:436
#[cfg(not(windows))]
socket . set_reuseport ( true ) ? ;
How it works:
All N shard workers bind to the same IP:PORT
Kernel distributes accept() calls across workers using a hash of the connection 5-tuple
No dedicated accept thread — each shard handles its own connections
Benefits:
Eliminates accept-thread bottleneck
Better cache locality (connection data stays on the same core)
Automatic load balancing without application logic
Periodic Expiration Sweeps
Each shard periodically sweeps expired keys to avoid memory leaks from forgotten TTLs:
// From kora-server/src/shard_io/mod.rs:866
fn maybe_sweep_inline ( store : & Rc < RefCell < ShardStore >>, ops_since_expire : & mut u32 ) {
* ops_since_expire += 1 ;
if * ops_since_expire >= EXPIRE_SWEEP_INTERVAL_OPS {
let _ = store
. borrow_mut ()
. evict_expired_sample ( EXPIRE_SWEEP_SAMPLE_SIZE );
* ops_since_expire = 0 ;
}
}
const EXPIRE_SWEEP_INTERVAL_OPS : u32 = 4096 ;
const EXPIRE_SWEEP_SAMPLE_SIZE : usize = 64 ;
Strategy:
Every 4096 operations, sample 64 random keys
Delete any that have expired TTLs
Amortizes expiration cost across many operations
Kora uses lazy expiration (check TTL on access) + periodic sampling (bounded sweep). This matches Redis’s hybrid approach.
Cross-Shard Communication Patterns
1. Single-Key Foreign Request
// From kora-server/src/shard_io/dispatch.rs (conceptual)
let ( tx , rx ) = oneshot :: channel ();
router . shard_senders[ foreign_shard ]
. send ( ShardRequest :: Execute { command , response_tx : tx }) ? ;
let response = timeout ( CROSS_SHARD_TIMEOUT , rx ) . await ?? ;
2. Multi-Key Fan-Out (MGET)
// From kora-core/src/shard/engine.rs:229
Command :: MGet { keys } => {
let mut shard_requests : Vec < Vec <( usize , Vec < u8 >)>> = vec! [ vec! []; self . shard_count];
for ( i , key ) in keys . iter () . enumerate () {
let shard_id = shard_for_key ( key , self . shard_count) as usize ;
shard_requests [ shard_id ] . push (( i , key . clone ()));
}
// Send sub-request to each shard
for ( shard_id , shard_keys ) in shard_requests . into_iter () . enumerate () {
let ( resp_tx , resp_rx ) = response_channel ();
self . workers[ shard_id ] . tx . send ( ShardMessage :: Single {
command : Command :: MGet { keys : shard_keys },
response_tx : resp_tx ,
});
receivers . push (( indices , resp_rx ));
}
// Merge responses in original order
}
3. Broadcast (DBSIZE, FLUSHDB)
// From kora-server/src/shard_io/mod.rs:179
pub async fn dispatch_broadcast < F >( & self , cmd_factory : F ) -> Vec < CommandResponse >
where
F : Fn () -> Command ,
{
let mut receivers = Vec :: with_capacity ( self . shard_count);
for sender in self . shard_senders . iter () {
let ( tx , rx ) = oneshot :: channel ();
let _ = sender . send ( ShardRequest :: Execute {
command : cmd_factory (),
queued_at : Instant :: now (),
response_tx : tx ,
});
receivers . push ( rx );
}
// Collect all responses
}
Operation Type Latency Overhead Allocations Locks Local-key GET ~0 ns (inline) 0 0 Local-key SET ~0 ns (inline) WAL buffer grow (rare) 0 Foreign-key GET ~1-2 μs 1 oneshot channel 0 MGET (10 keys) ~2-5 μs N oneshot channels 0 DBSIZE (broadcast) ~5-10 μs N oneshot channels 0
Measurements from AWS m5.xlarge (4 vCPU, 3.1 GHz) with 4 shard workers.
Next Steps
Sharding Learn how hash-based key routing works
Architecture Understand the overall system design