Skip to main content

In-Memory Store

ValKeyper uses a simple but effective in-memory data model based on Go’s native map types:
type KVStore struct {
    store          map[string]string       // Main key-value storage
    expiryMap      map[string]chan int     // Expiry management
    Stream         map[string][]StreamEntry // Stream data structures
    // ... other fields ...
}

Core Data Types

The primary data type in ValKeyper is strings, stored in a simple map:
store map[string]string
Operations:
// SET
kv.store[key] = value

// GET
val, ok := kv.store[key]

// DEL
delete(kv.store, key)
Supported Commands:
  • SET key value [PX milliseconds] - Set a key with optional expiry
  • GET key - Retrieve a value
  • DEL key - Delete a key
  • INCR key - Increment a numeric string value
All values are stored as strings. Numeric operations like INCR parse and format strings as needed.

Type Detection

ValKeyper determines the type of a key using the TYPE command:
case "TYPE":
    _, ok := kv.store[buff[1]]
    if ok {
        res = []byte("+string\r\n")
    } else {
        _, ok2 := kv.Stream[buff[1]]
        if ok2 {
            res = []byte("+stream\r\n")
        } else {
            res = []byte("+none\r\n")
        }
    }
A key can be:
  • string - exists in the store map
  • stream - exists in the Stream map
  • none - doesn’t exist in either

Expiry Management

ValKeyper implements key expiration using goroutines and channels:
expiryMap map[string]chan int

func (kv *KVStore) Set(key, value string, expiry int) {
    if expiry != -1 {
        timeout := time.After(time.Duration(expiry) * time.Millisecond)
        go kv.handleExpiry(timeout, key)
    }
    kv.store[key] = value
}

Expiry Handler

Each key with TTL gets a dedicated goroutine:
func (kv *KVStore) handleExpiry(timeout <-chan time.Time, key string) {
    closeCh := make(chan int)
    kv.expiryMap[key] = closeCh
    for {
        select {
        case <-closeCh:
            return  // Key was deleted manually
        case <-timeout:
            delete(kv.store, key)  // TTL expired
        }
    }
}
1

SET with expiry

Client executes SET key value PX 5000
2

Create timeout

time.After(5000 * time.Millisecond) creates a timer channel
3

Spawn goroutine

A new goroutine starts waiting on the timeout
4

Store value

The key-value pair is stored immediately
5

Wait for expiry

After 5 seconds, the timeout fires and the key is deleted

Manual Deletion

When a key is explicitly deleted, its expiry goroutine is notified:
case "DEL":
    key := buff[1]
    delete(kv.store, key)
    
    ch, ok := kv.expiryMap[key]
    if ok {
        ch <- 1  // Signal the goroutine to stop
    }
This approach creates one goroutine per expiring key. While Go’s goroutines are lightweight, this may not scale to millions of keys with TTLs. Consider using a heap-based expiry queue for production workloads.

Stream Entry IDs

Stream IDs follow the format timestamp-sequence:
// Auto-generate full ID
if buff[2] == "*" {
    buff[2] = fmt.Sprintf("%d-%d", time.Now().UnixMilli(), 0)
}

// Auto-generate sequence number
if currEntry[1] == "*" {
    if lastEntryTime == currEntryTime {
        buff[2] = fmt.Sprintf("%d-%d", lastEntryTime, lastEntrySeq+1)
    } else {
        if currEntryTime == 0 {
            buff[2] = fmt.Sprintf("%d-%d", currEntryTime, 1)
        } else {
            buff[2] = fmt.Sprintf("%d-%d", currEntryTime, 0)
        }
    }
}

ID Validation

Stream IDs must be monotonically increasing:
// Must be greater than 0-0
if currEntryTime < 1 && currEntrySeq < 1 {
    res = []byte("-ERR The ID specified in XADD must be greater than 0-0\r\n")
    break
}

// Must be greater than last entry
if lastEntryTime > currEntryTime {
    res = []byte("-ERR The ID specified in XADD is equal or smaller than the target stream top item\r\n")
    break
}

if lastEntryTime == currEntryTime && lastEntrySeq >= currEntrySeq {
    res = []byte("-ERR The ID specified in XADD is equal or smaller than the target stream top item\r\n")
    break
}

Persistence: RDB Format

ValKeyper can load data from RDB (Redis Database) files:
func (kv *KVStore) LoadFromRDB(rdb *rdb.RDB) {
    if len(rdb.Dbs) < 1 {
        return
    }
    kv.store = rdb.Dbs[0].DbStore

    for _, x := range rdb.Dbs[0].ExpiryStore {
        kv.store[x.Key] = x.Value
        duration := time.Duration(int64(x.Expiry)-time.Now().UnixMilli()) * time.Millisecond
        go kv.handleExpiry(time.After(duration), x.Key)
    }
}

RDB Database Structure

type Database struct {
    Index       int                 // Database number (0-15)
    Size        int                 // Number of keys
    Expiry      int                 // Number of keys with expiry
    DbStore     map[string]string   // Regular keys
    ExpiryStore []expiryEntry       // Keys with expiry times
}

type expiryEntry struct {
    Key    string
    Value  string
    Expiry uint64  // Unix timestamp in milliseconds
}
  1. Parse RDB file using the rdb package
  2. Load regular keys directly into kv.store
  3. Load expiring keys into kv.store
  4. Calculate remaining TTL from stored expiry timestamp
  5. Spawn expiry goroutines for each key with TTL
duration := time.Duration(int64(x.Expiry)-time.Now().UnixMilli()) * time.Millisecond
go kv.handleExpiry(time.After(duration), x.Key)

Transaction Support

ValKeyper implements transactions using the connection state:
type Connection struct {
    Conn       net.Conn
    TxnStarted bool        // Is MULTI active?
    TxnQueue   [][]string  // Queued commands
}

Transaction Flow

case "MULTI":
    connection.TxnStarted = true
    res = []byte("+OK\r\n")

Numeric Operations

Even though all values are strings, ValKeyper supports numeric operations:
case "INCR":
    v, ok := kv.store[buff[1]]
    if !ok {
        kv.store[buff[1]] = "1"
    } else {
        val, err := strconv.Atoi(v)
        if err == nil {
            kv.store[buff[1]] = fmt.Sprintf("%d", val+1)
        } else {
            res = []byte("-ERR value is not an integer or out of range\r\n")
            break
        }
    }
    res = []byte(fmt.Sprintf(":%s\r\n", kv.store[buff[1]]))
The INCR command:
  1. Checks if the key exists
  2. If not, initializes to “1”
  3. If yes, attempts to parse as integer
  4. Increments and stores as string
  5. Returns error if value is not numeric

Memory Model

No Persistence

Data lives entirely in memory; restarted servers lose data unless RDB is loaded

Single-Threaded Ops

Each connection’s commands execute sequentially in its goroutine

Goroutine-per-Key TTL

Each expiring key consumes a goroutine and channel

No Explicit Locking

Shared maps accessed concurrently without mutexes
For production use, consider:
  • Adding mutex protection for the shared store map
  • Implementing a more scalable expiry mechanism (e.g., timer wheel)
  • Periodic RDB snapshots for durability
  • Memory limits and eviction policies

Data Initialization

A new store is created with sensible defaults:
func New() *KVStore {
    return &KVStore{
        Info: Info{
            Role:             "master",
            MasterReplId:     "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb",
            MasterReplOffSet: 0,
            Port:             "6379",
        },
        store:     make(map[string]string),
        expiryMap: make(map[string]chan int),
        AckCh:     make(chan int),
        Stream:    make(map[string][]StreamEntry),
        StreamXCh: make(chan []byte),
    }
}
All maps are initialized empty, ready to accept data from:
  • Client commands (SET, XADD, etc.)
  • RDB file loading (—dir and —dbfilename flags)
  • Master replication (—replicaof flag)

Build docs developers (and LLMs) love