Overview
Streams in ValKeyper provide an append-only log data structure that allows you to store time-ordered entries with key-value pairs. Each stream entry has a unique ID in the format timestamp-sequence and can contain multiple field-value pairs.
Stream Data Structure
ValKeyper stores streams in memory using the following structure:
type StreamEntry struct {
Id string
Pair map [ string ] string
}
type KVStore struct {
// ...
Stream map [ string ][] StreamEntry
StreamXCh chan [] byte // Channel for blocking reads
}
The Stream field is a map where:
Key : Stream name (string)
Value : Slice of StreamEntry objects in append-only order
Each StreamEntry contains:
Id : Unique identifier in timestamp-sequence format
Pair : Map of field names to values
XADD Command
The XADD command appends a new entry to a stream.
Syntax
XADD stream_name entry_id field value [field value ...]
Auto-Generated IDs
You can use * for automatic ID generation:
XADD mystream * temperature 23.5 humidity 65
When * is used, ValKeyper generates an ID using the current Unix timestamp in milliseconds:
if buff [ 2 ] == "*" {
buff [ 2 ] = fmt . Sprintf ( " %d - %d " , time . Now (). UnixMilli (), 0 )
}
Partial Auto-Generation
You can specify the timestamp and use * for the sequence:
XADD mystream 1234567890- * temperature 24.0
Implementation logic:
if currEntry [ 1 ] == "*" {
if lastEntryTime == currEntryTime {
// Increment sequence if timestamp matches
buff [ 2 ] = fmt . Sprintf ( " %d - %d " , lastEntryTime , lastEntrySeq + 1 )
} else {
// Use 0 or 1 based on timestamp
if currEntryTime == 0 {
buff [ 2 ] = fmt . Sprintf ( " %d - %d " , currEntryTime , 1 )
} else {
buff [ 2 ] = fmt . Sprintf ( " %d - %d " , currEntryTime , 0 )
}
}
}
ID Validation
ValKeyper enforces strict ordering rules:
Minimum ID : Entry IDs must be greater than 0-0
if currEntryTime < 1 && currEntrySeq < 1 {
return "-ERR The ID specified in XADD must be greater than 0-0 \r\n "
}
Monotonic Ordering : New IDs must be greater than the last entry
if lastEntryTime > currEntryTime {
return "-ERR The ID specified in XADD is equal or smaller than the target stream top item \r\n "
}
if lastEntryTime == currEntryTime && lastEntrySeq >= currEntrySeq {
return "-ERR The ID specified in XADD is equal or smaller than the target stream top item \r\n "
}
Example
# Add entries with auto-generated IDs
XADD sensors * temperature 22.5 location "room1"
XADD sensors * temperature 23.0 location "room2"
# Add entry with explicit ID
XADD sensors 1234567890-0 temperature 24.5 location "room3"
XRANGE Command
The XRANGE command queries a range of stream entries.
Syntax
XRANGE stream_name start end
Special Range Values
- : Minimum possible ID (0-0)
+ : Maximum possible ID
Implementation
ValKeyper converts + to the maximum integer value:
if buff [ 3 ] == "+" {
buff [ 3 ] = fmt . Sprintf ( " %d - %d " , math . MaxInt64 , math . MaxInt64 )
}
Range filtering logic:
for _ , se := range kv . Stream [ key ] {
curr := strings . Split ( se . Id , "-" )
currTime , _ := strconv . Atoi ( curr [ 0 ])
currSeq , _ := strconv . Atoi ( curr [ 1 ])
if currTime >= startTime && currSeq >= startSeq &&
currTime <= endTime && currSeq <= endSeq {
// Include this entry in results
}
}
Example
# Get all entries
XRANGE mystream - +
# Get entries in a specific timestamp range
XRANGE mystream 1234567890-0 1234567999-0
# Get entries from a specific ID onwards
XRANGE mystream 1234567890-5 +
XREAD Command
The XREAD command reads entries from one or more streams.
Syntax
# Non-blocking read
XREAD streams stream_name [stream_name ...] id [id ...]
# Blocking read
XREAD block timeout streams stream_name [stream_name ...] id [id ...]
Blocking Behavior
ValKeyper implements blocking reads using Go channels:
StreamXCh chan [] byte // Notification channel
When XADD is called, it attempts to notify blocked readers:
select {
case kv . StreamXCh <- res :
fmt . Println ( "someones there, sent..." )
default :
fmt . Println ( "no one on the other side" )
}
Blocking read implementation:
if buff [ 1 ] == "block" {
waitTime , _ := strconv . Atoi ( buff [ 2 ])
timeoutCh := time . After ( time . Duration ( waitTime ) * time . Millisecond )
if waitTime != 0 {
select {
case <- kv . StreamXCh :
// New entry available
case <- timeoutCh :
return "$-1 \r\n " // Timeout
}
} else {
// Block indefinitely (waitTime = 0)
<- kv . StreamXCh
}
}
Special ID Values
$ : Only return entries added after the command was issued
Specific ID : Return entries with IDs greater than the specified ID
Example
# Read all new entries (non-blocking)
XREAD streams mystream 0-0
# Block for 1000ms waiting for new entries
XREAD block 1000 streams mystream $
# Block indefinitely (timeout = 0)
XREAD block 0 streams mystream $
# Read from multiple streams
XREAD streams stream1 stream2 0-0 0-0
Stream Entry Filtering
When reading with a threshold ID, ValKeyper filters entries using:
threshold := strings . Split ( ids [ i ], "-" )
thresholdTime , _ := strconv . Atoi ( threshold [ 0 ])
thresholdSeq , _ := strconv . Atoi ( threshold [ 1 ])
for _ , se := range kv . Stream [ key ] {
curr := strings . Split ( se . Id , "-" )
currTime , _ := strconv . Atoi ( curr [ 0 ])
currSeq , _ := strconv . Atoi ( curr [ 1 ])
// Return entries AFTER the threshold
if ( currTime == thresholdTime && currSeq > thresholdSeq ) ||
( currTime > thresholdTime ) {
// Include entry
}
}
The filtering logic ensures that only entries strictly greater than the specified ID are returned, not equal to it.
Best Practices
Use Auto-Generation Let ValKeyper generate IDs automatically with * to ensure proper ordering and avoid ID conflicts.
Blocking Reads Use blocking reads with appropriate timeouts to implement real-time stream consumers efficiently.
Range Queries Use XRANGE with specific time ranges to query historical data efficiently.
Monitor Growth Streams grow indefinitely in memory. Implement cleanup strategies for production use.
In-Memory Storage : All stream data is stored in memory as a slice of entries
Linear Scanning : Range queries iterate through entries linearly
Channel-Based Blocking : Blocking reads use Go channels with minimal overhead
No Persistence : Stream data is not persisted to disk in the current implementation
Streams in ValKeyper are stored entirely in memory and are not persisted. Restarting the server will lose all stream data.