Skip to main content
Streams are ValKeyper’s append-only log data structure. They are designed for managing ordered sequences of entries with unique IDs, supporting both historical queries and real-time blocking reads.

XADD

Appends a new entry to a stream. Creates the stream if it doesn’t exist.

Syntax

XADD key ID field value [field value ...]

Parameters

key
string
required
The name of the stream
ID
string
required
The entry ID in the format timestamp-sequence. Use * for auto-generation, or timestamp-* to auto-generate the sequence number.
field value
string
required
One or more field-value pairs to store in the entry. Must provide at least one pair.

Return Value

Bulk string reply: The ID of the added entry.

ID Generation

  • * - Auto-generates both timestamp (current time in ms) and sequence (0)
  • timestamp-* - Uses the provided timestamp, auto-generates sequence
  • timestamp-sequence - Uses both provided values

Errors

  • -ERR The ID specified in XADD must be greater than 0-0
  • -ERR The ID specified in XADD is equal or smaller than the target stream top item

Examples

# Add entry with auto-generated ID
XADD mystream * temperature 20 humidity 65
# Response: "1234567890000-0"

# Add entry with specific timestamp, auto sequence
XADD mystream 1234567891000-* sensor "sensor-1" status "active"
# Response: "1234567891000-0"

# Add another entry (sequence auto-increments)
XADD mystream 1234567891000-* reading 25
# Response: "1234567891000-1"

# Add entry with explicit ID
XADD mystream 1234567892000-0 event "completed"
# Response: "1234567892000-0"

# Error: ID must be greater than previous
XADD mystream 1234567891000-0 test "value"
# Response: -ERR The ID specified in XADD is equal or smaller than the target stream top item

Implementation Details

From store.go:317-384, the implementation handles:
  1. Auto ID generation (line 318-320):
    if buff[2] == "*" {
        buff[2] = fmt.Sprintf("%d-%d", time.Now().UnixMilli(), 0)
    }
    
  2. Sequence auto-increment (line 331-344): When using timestamp-*, the sequence is automatically set based on the last entry
  3. ID validation (line 347-358): Ensures new IDs are greater than the last entry and greater than 0-0
  4. Notification (line 379-384): Signals blocked XREAD commands when new data arrives

XRANGE

Returns a range of entries from a stream between two IDs.

Syntax

XRANGE key start end

Parameters

key
string
required
The name of the stream
start
string
required
Minimum entry ID (inclusive). Use - for the smallest ID in the stream.
end
string
required
Maximum entry ID (inclusive). Use + for the largest ID in the stream.

Return Value

Array reply: List of entries, where each entry is a two-element array:
  1. The entry ID
  2. An array of field-value pairs

Examples

# Add some entries
XADD mystream * sensor A value 10
XADD mystream * sensor B value 20
XADD mystream * sensor C value 30

# Get all entries
XRANGE mystream - +
# Response:
# 1) 1) "1234567890000-0"
#    2) 1) "sensor"
#       2) "A"
#       3) "value"
#       4) "10"
# 2) 1) "1234567890001-0"
#    2) 1) "sensor"
#       2) "B"
#       3) "value"
#       4) "20"
# 3) 1) "1234567890002-0"
#    2) 1) "sensor"
#       2) "C"
#       3) "value"
#       4) "30"

# Get entries in a specific range
XRANGE mystream 1234567890000-0 1234567890001-0
# Response: (first two entries)

# Get entries from a timestamp
XRANGE mystream 1234567890001 +
# Response: (last two entries)

Implementation Details

From store.go:385-426, the implementation:
  1. Handles special bounds (line 388-390): Converts + to maximum possible ID
  2. Parses IDs (line 387-402): Splits IDs into timestamp and sequence components
  3. Filters entries (line 408-422): Includes entries where both timestamp and sequence are within range

XREAD

Read data from one or multiple streams, optionally blocking until new data arrives.

Syntax

XREAD [block milliseconds] STREAMS key [key ...] ID [ID ...]
The block keyword must be lowercase. The implementation checks for lowercase "block" specifically.

Parameters

block milliseconds
integer
Block for the specified number of milliseconds if no data is available. Use 0 to block indefinitely until new data arrives. Must be lowercase.
key
string
required
One or more stream keys to read from
ID
string
required
The ID to start reading from for each stream. Use $ to read only new entries (greater than the last entry), or a specific ID like 0-0 to read all entries, or any ID to read entries after that ID.

Return Value

Array reply: For each stream, returns a two-element array:
  1. The stream name
  2. An array of entries
Returns nil if the timeout expires with no new data.

Examples

# Add initial data
XADD mystream * field1 value1
XADD mystream * field2 value2

# Read all entries from the beginning
XREAD STREAMS mystream 0-0
# Response:
# 1) 1) "mystream"
#    2) 1) 1) "1234567890000-0"
#          2) 1) "field1"
#             2) "value1"
#       2) 1) "1234567890001-0"
#          2) 1) "field2"
#             2) "value2"

# Read entries after a specific ID
XREAD STREAMS mystream 1234567890000-0
# Response: (only the second entry)

# Block for 1 second waiting for new data
XREAD block 1000 STREAMS mystream $
# If no data arrives in 1 second:
# Response: (nil)

# In another connection, add data:
XADD mystream * field3 value3
# The blocked XREAD immediately returns the new entry

# Block indefinitely (0 = wait forever)
XREAD block 0 STREAMS mystream $
# Waits until new data is added with XADD

Blocking Behavior

When using the block option:
  • With timeout (block 1000): Waits up to the specified milliseconds for new data
  • Without timeout (block 0): Waits indefinitely until new data arrives
  • Non-blocking (no block): Returns immediately with available data

Implementation Details

From store.go:427-506, key features:
  1. Blocking implementation (line 434-453):
    if buff[1] == "block" {
        waitTime, _ = strconv.Atoi(buff[2])
        timeoutCh := time.After(time.Duration(waitTime) * time.Millisecond)
        if waitTime != 0 {
            select {
            case <-kv.StreamXCh:
                // New data arrived
            case <-timeoutCh:
                res = []byte("$-1\r\n")
                break switchLoop
            }
        } else {
            <-kv.StreamXCh  // Wait indefinitely
        }
    }
    
  2. Special $ handling (line 469-479): Returns only the most recent entry
  3. Threshold filtering (line 482-500): Returns only entries with IDs greater than the specified threshold

Use Cases

Event Logging

# Log application events
XADD app:events * type "user_login" user_id 123 timestamp 1234567890
XADD app:events * type "page_view" user_id 123 page "/dashboard"

# Query events in a time range
XRANGE app:events 1234567890000-0 1234567899999-0

Real-time Monitoring

# Monitor sensor data with blocking
XREAD block 0 STREAMS sensors:temperature $

# In another connection:
XADD sensors:temperature * celsius 22.5 location "room-1"
# The blocked XREAD immediately receives this data

Message Queue

# Producer adds tasks
XADD tasks * type "email" recipient "[email protected]" priority "high"
XADD tasks * type "sms" recipient "+1234567890" priority "normal"

# Consumer processes tasks
XREAD block 5000 STREAMS tasks 0-0
# Process each task...

# Continue from last processed ID
XREAD block 5000 STREAMS tasks 1234567890001-0

Time Series Data

# Record metrics
XADD metrics:cpu * usage 45.2 cores 4
XADD metrics:cpu * usage 52.1 cores 4
XADD metrics:cpu * usage 38.7 cores 4

# Query historical data
XRANGE metrics:cpu - +

# Monitor new metrics in real-time
XREAD block 0 STREAMS metrics:cpu $

Build docs developers (and LLMs) love