Redis streams are append-only log data structures designed for message queuing and event sourcing. They support consumer groups, acknowledgments, and time-based querying.
XADD
Appends a new entry to a stream. Creates the stream if it doesn’t exist.
Syntax
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]]
<* | id> field value [field value ...]
The stream key. Created if it doesn’t exist (unless NOMKSTREAM is specified).
Don’t create the stream if it doesn’t exist.
Trim stream to specified maximum length.
Remove entries with IDs lower than threshold.
Approximate trimming (more efficient).
Limit number of entries evicted during trimming.
Entry ID. Use * for auto-generation, or specify as milliseconds-sequence.
The ID of the added entry, or nil if NOMKSTREAM prevented creation.
Time Complexity: O(1) when adding, O(N) when trimming where N is the number of evicted entries.
History:
- 6.2.0: Added
NOMKSTREAM, MINID trimming strategy and LIMIT option.
- 7.0.0: Added support for
<ms>-* explicit ID form.
Examples
redis> XADD mystream * sensor-id 1234 temperature 19.8
"1640000000000-0"
redis> XADD mystream * sensor-id 1234 temperature 20.1
"1640000001000-0"
redis> XADD mystream MAXLEN ~ 1000 * sensor-id 1234 temperature 20.5
"1640000002000-0"
redis> XADD mystream * field1 "value1" field2 "value2" field3 "value3"
"1640000003000-0"
XREAD
Reads entries from one or more streams.
Syntax
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
Maximum number of entries to return per stream.
Block for specified milliseconds waiting for new entries. Use 0 for infinite blocking.
Stream key(s) to read from.
Starting ID for each stream. Use $ for new entries only, or 0 for all entries.
Array of streams with their entries, or nil if no entries (when blocking).
Time Complexity: O(N) where N is the number of entries returned.
Examples
redis> XADD mystream * field1 "value1"
"1640000000000-0"
redis> XADD mystream * field2 "value2"
"1640000001000-0"
redis> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1640000000000-0"
2) 1) "field1"
2) "value1"
2) 1) "1640000001000-0"
2) 1) "field2"
2) "value2"
redis> XREAD BLOCK 5000 STREAMS mystream $
(nil)
XGROUP
Manages consumer groups.
Syntax
XGROUP CREATE key group <id | $> [MKSTREAM]
XGROUP SETID key group <id | $>
XGROUP DESTROY key group
XGROUP CREATECONSUMER key group consumer
XGROUP DELCONSUMER key group consumer
Starting ID for the group. Use $ to start from end, 0 for beginning.
Create the stream if it doesn’t exist.
Consumer name within the group.
OK on success, or integer count for DELCONSUMER.
Time Complexity: O(1) for most operations.
Examples
redis> XGROUP CREATE mystream mygroup $ MKSTREAM
OK
redis> XGROUP CREATECONSUMER mystream mygroup consumer1
(integer) 1
redis> XGROUP CREATECONSUMER mystream mygroup consumer2
(integer) 1
redis> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 0
redis> XGROUP DESTROY mystream mygroup
(integer) 1
XACK
Acknowledges processed messages in a consumer group.
Syntax
XACK key group id [id ...]
Entry ID(s) to acknowledge.
Number of messages successfully acknowledged.
Time Complexity: O(1) for each message ID.
Examples
redis> XACK mystream mygroup 1640000000000-0
(integer) 1
redis> XACK mystream mygroup 1640000001000-0 1640000002000-0
(integer) 2
Additional Stream Commands
- XLEN: Get stream length
- XRANGE: Query range of entries
- XREVRANGE: Query range in reverse
- XREADGROUP: Read from consumer group
- XPENDING: Get pending messages info
- XCLAIM: Claim pending messages
- XAUTOCLAIM: Automatically claim pending messages
- XINFO: Get stream information
- XTRIM: Trim stream
- XDEL: Delete entries
- XSETID: Set stream last ID
Use Cases
Event Sourcing
Store all events in order:
# Store events
redis> XADD events * type "user_created" user_id "1000" username "alice"
"1640000000000-0"
redis> XADD events * type "user_updated" user_id "1000" email "[email protected]"
"1640000001000-0"
redis> XADD events * type "user_deleted" user_id "1000"
"1640000002000-0"
# Replay events
redis> XRANGE events - +
1) 1) "1640000000000-0"
2) 1) "type"
2) "user_created"
3) "user_id"
4) "1000"
5) "username"
6) "alice"
2) 1) "1640000001000-0"
2) 1) "type"
2) "user_updated"
3) "user_id"
4) "1000"
5) "email"
6) "[email protected]"
3) 1) "1640000002000-0"
2) 1) "type"
2) "user_deleted"
3) "user_id"
4) "1000"
Message Queue with Consumer Groups
Distribute work among consumers:
# Create consumer group
redis> XGROUP CREATE tasks mygroup 0
OK
# Add tasks
redis> XADD tasks * job "process_image" image_id "img1"
"1640000000000-0"
redis> XADD tasks * job "send_email" user_id "1000"
"1640000001000-0"
# Consumer 1 reads
redis> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS tasks >
1) 1) "tasks"
2) 1) 1) "1640000000000-0"
2) 1) "job"
2) "process_image"
3) "image_id"
4) "img1"
# Consumer 2 reads (gets different message)
redis> XREADGROUP GROUP mygroup consumer2 COUNT 1 STREAMS tasks >
1) 1) "tasks"
2) 1) 1) "1640000001000-0"
2) 1) "job"
2) "send_email"
3) "user_id"
4) "1000"
# Acknowledge after processing
redis> XACK tasks mygroup 1640000000000-0
(integer) 1
Time Series Data
Store sensor readings:
redis> XADD sensors * device "sensor1" temp "22.5" humidity "45"
"1640000000000-0"
redis> XADD sensors * device "sensor1" temp "23.1" humidity "46"
"1640000001000-0"
redis> XADD sensors * device "sensor2" temp "21.8" humidity "50"
"1640000002000-0"
# Query time range
redis> XRANGE sensors 1640000000000 1640000001000
1) 1) "1640000000000-0"
2) 1) "device"
2) "sensor1"
3) "temp"
4) "22.5"
5) "humidity"
6) "45"
2) 1) "1640000001000-0"
2) 1) "device"
2) "sensor1"
3) "temp"
4) "23.1"
5) "humidity"
6) "46"
Activity Feed
Track user activities:
redis> XADD feed:user:1000 * action "login" ip "192.168.1.1"
"1640000000000-0"
redis> XADD feed:user:1000 * action "view_post" post_id "50"
"1640000001000-0"
redis> XADD feed:user:1000 * action "logout"
"1640000002000-0"
# Get recent activities
redis> XREVRANGE feed:user:1000 + - COUNT 10
1) 1) "1640000002000-0"
2) 1) "action"
2) "logout"
2) 1) "1640000001000-0"
2) 1) "action"
2) "view_post"
3) "post_id"
4) "50"
3) 1) "1640000000000-0"
2) 1) "action"
2) "login"
3) "ip"
4) "192.168.1.1"
Best Practices
- Append: XADD is O(1) - very fast
- Trimming: Use approximate trimming (~) for better performance
- Range Queries: Limit results with COUNT
- Consumer Groups: Distribute load across multiple consumers
Memory Management
- Use MAXLEN to cap stream size
- Trim by ID with MINID for time-based retention
- Set LIMIT on trimming to avoid blocking
- Monitor memory usage with XINFO STREAM
Consumer Groups
Single Consumer
Consumer Group
Simple processing:redis> XREAD BLOCK 5000 STREAMS mystream $
# Process entries
Distributed processing:redis> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
# Process entries
redis> XACK mystream mygroup <entry-id>
Patterns
Exactly-Once Processing
Ensure messages are processed once:
# Read from group
redis> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# Process message...
# Acknowledge
redis> XACK mystream mygroup 1640000000000-0
(integer) 1
Dead Letter Queue
Handle failed messages:
# Check pending messages
redis> XPENDING mystream mygroup
# Claim old pending messages
redis> XAUTOCLAIM mystream mygroup consumer2 3600000 0-0 COUNT 10
# Move failed messages to DLQ
redis> XADD dlq * original_stream "mystream" entry_id "1640000000000-0" reason "processing_failed"
Capped Stream
Maintain fixed-size stream:
redis> XADD mystream MAXLEN ~ 10000 * field value
"1640000000000-0"
Multi-Stream Fan-out
Read from multiple streams:
redis> XREAD COUNT 10 STREAMS stream1 stream2 stream3 $ $ $
Consumer group state is persistent. Always acknowledge processed messages to prevent pending messages from accumulating.
Stream vs Other Data Structures
| Feature | Stream | List | Pub/Sub |
|---|
| Persistence | Yes | Yes | No |
| History | Yes | Yes | No |
| Consumer Groups | Yes | No | No |
| Blocking | Yes | Yes | Yes |
| Use Case | Event logs | Queues | Real-time |
Advanced Patterns
Retry Mechanism
# Read with group
redis> XREADGROUP GROUP mygroup consumer1 STREAMS tasks >
# On failure, leave pending
# Later, claim and retry
redis> XCLAIM tasks mygroup consumer2 3600000 1640000000000-0
Stream Branching
# Main stream
redis> XADD orders * order_id "1000"
# Branch to specialized streams
redis> XADD orders:priority * order_id "1000"
redis> XADD orders:analytics * order_id "1000"