Skip to main content

Overview

Streams in ValKeyper provide an append-only log data structure that allows you to store time-ordered entries with key-value pairs. Each stream entry has a unique ID in the format timestamp-sequence and can contain multiple field-value pairs.

Stream Data Structure

ValKeyper stores streams in memory using the following structure:
type StreamEntry struct {
    Id   string
    Pair map[string]string
}

type KVStore struct {
    // ...
    Stream    map[string][]StreamEntry
    StreamXCh chan []byte  // Channel for blocking reads
}
The Stream field is a map where:
  • Key: Stream name (string)
  • Value: Slice of StreamEntry objects in append-only order
Each StreamEntry contains:
  • Id: Unique identifier in timestamp-sequence format
  • Pair: Map of field names to values

XADD Command

The XADD command appends a new entry to a stream.

Syntax

XADD stream_name entry_id field value [field value ...]

Auto-Generated IDs

You can use * for automatic ID generation:
XADD mystream * temperature 23.5 humidity 65
When * is used, ValKeyper generates an ID using the current Unix timestamp in milliseconds:
if buff[2] == "*" {
    buff[2] = fmt.Sprintf("%d-%d", time.Now().UnixMilli(), 0)
}

Partial Auto-Generation

You can specify the timestamp and use * for the sequence:
XADD mystream 1234567890-* temperature 24.0
Implementation logic:
if currEntry[1] == "*" {
    if lastEntryTime == currEntryTime {
        // Increment sequence if timestamp matches
        buff[2] = fmt.Sprintf("%d-%d", lastEntryTime, lastEntrySeq+1)
    } else {
        // Use 0 or 1 based on timestamp
        if currEntryTime == 0 {
            buff[2] = fmt.Sprintf("%d-%d", currEntryTime, 1)
        } else {
            buff[2] = fmt.Sprintf("%d-%d", currEntryTime, 0)
        }
    }
}

ID Validation

ValKeyper enforces strict ordering rules:
  1. Minimum ID: Entry IDs must be greater than 0-0
if currEntryTime < 1 && currEntrySeq < 1 {
    return "-ERR The ID specified in XADD must be greater than 0-0\r\n"
}
  1. Monotonic Ordering: New IDs must be greater than the last entry
if lastEntryTime > currEntryTime {
    return "-ERR The ID specified in XADD is equal or smaller than the target stream top item\r\n"
}
if lastEntryTime == currEntryTime && lastEntrySeq >= currEntrySeq {
    return "-ERR The ID specified in XADD is equal or smaller than the target stream top item\r\n"
}

Example

# Add entries with auto-generated IDs
XADD sensors * temperature 22.5 location "room1"
XADD sensors * temperature 23.0 location "room2"

# Add entry with explicit ID
XADD sensors 1234567890-0 temperature 24.5 location "room3"

XRANGE Command

The XRANGE command queries a range of stream entries.

Syntax

XRANGE stream_name start end

Special Range Values

  • -: Minimum possible ID (0-0)
  • +: Maximum possible ID

Implementation

ValKeyper converts + to the maximum integer value:
if buff[3] == "+" {
    buff[3] = fmt.Sprintf("%d-%d", math.MaxInt64, math.MaxInt64)
}
Range filtering logic:
for _, se := range kv.Stream[key] {
    curr := strings.Split(se.Id, "-")
    currTime, _ := strconv.Atoi(curr[0])
    currSeq, _ := strconv.Atoi(curr[1])
    
    if currTime >= startTime && currSeq >= startSeq && 
       currTime <= endTime && currSeq <= endSeq {
        // Include this entry in results
    }
}

Example

# Get all entries
XRANGE mystream - +

# Get entries in a specific timestamp range
XRANGE mystream 1234567890-0 1234567999-0

# Get entries from a specific ID onwards
XRANGE mystream 1234567890-5 +

XREAD Command

The XREAD command reads entries from one or more streams.

Syntax

# Non-blocking read
XREAD streams stream_name [stream_name ...] id [id ...]

# Blocking read
XREAD block timeout streams stream_name [stream_name ...] id [id ...]

Blocking Behavior

ValKeyper implements blocking reads using Go channels:
StreamXCh chan []byte  // Notification channel
When XADD is called, it attempts to notify blocked readers:
select {
case kv.StreamXCh <- res:
    fmt.Println("someones there, sent...")
default:
    fmt.Println("no one on the other side")
}
Blocking read implementation:
if buff[1] == "block" {
    waitTime, _ := strconv.Atoi(buff[2])
    timeoutCh := time.After(time.Duration(waitTime) * time.Millisecond)
    
    if waitTime != 0 {
        select {
        case <-kv.StreamXCh:
            // New entry available
        case <-timeoutCh:
            return "$-1\r\n"  // Timeout
        }
    } else {
        // Block indefinitely (waitTime = 0)
        <-kv.StreamXCh
    }
}

Special ID Values

  • $: Only return entries added after the command was issued
  • Specific ID: Return entries with IDs greater than the specified ID

Example

# Read all new entries (non-blocking)
XREAD streams mystream 0-0

# Block for 1000ms waiting for new entries
XREAD block 1000 streams mystream $

# Block indefinitely (timeout = 0)
XREAD block 0 streams mystream $

# Read from multiple streams
XREAD streams stream1 stream2 0-0 0-0

Stream Entry Filtering

When reading with a threshold ID, ValKeyper filters entries using:
threshold := strings.Split(ids[i], "-")
thresholdTime, _ := strconv.Atoi(threshold[0])
thresholdSeq, _ := strconv.Atoi(threshold[1])

for _, se := range kv.Stream[key] {
    curr := strings.Split(se.Id, "-")
    currTime, _ := strconv.Atoi(curr[0])
    currSeq, _ := strconv.Atoi(curr[1])
    
    // Return entries AFTER the threshold
    if (currTime == thresholdTime && currSeq > thresholdSeq) || 
       (currTime > thresholdTime) {
        // Include entry
    }
}
The filtering logic ensures that only entries strictly greater than the specified ID are returned, not equal to it.

Best Practices

Use Auto-Generation

Let ValKeyper generate IDs automatically with * to ensure proper ordering and avoid ID conflicts.

Blocking Reads

Use blocking reads with appropriate timeouts to implement real-time stream consumers efficiently.

Range Queries

Use XRANGE with specific time ranges to query historical data efficiently.

Monitor Growth

Streams grow indefinitely in memory. Implement cleanup strategies for production use.

Performance Considerations

  • In-Memory Storage: All stream data is stored in memory as a slice of entries
  • Linear Scanning: Range queries iterate through entries linearly
  • Channel-Based Blocking: Blocking reads use Go channels with minimal overhead
  • No Persistence: Stream data is not persisted to disk in the current implementation
Streams in ValKeyper are stored entirely in memory and are not persisted. Restarting the server will lose all stream data.

Build docs developers (and LLMs) love