Usage
import { Redis } from "@upstash/redis" ;
const redis = new Redis ({
url: "<UPSTASH_REDIS_URL>" ,
token: "<UPSTASH_REDIS_TOKEN>" ,
});
const entries = await redis . xread ( "events" , "0" );
Parameters
key
string | string[]
required
The stream key(s) to read from. Can be a single key or an array of keys.
id
string | string[]
required
The starting ID(s) for each stream. Must match the number of keys:
"0" - Read from the beginning of the stream
"<ms>-<seq>" - Read entries after this specific ID
"$" - Read only new entries (commonly used with BLOCK)
If key is an array, id must also be an array with the same length.
Optional reading options: Maximum number of entries to return per stream.
Deprecated : Block for the specified milliseconds waiting for new entries.Not yet supported in Upstash Redis.
Response
An array of stream data. The exact structure depends on the streams read. For single stream reads, returns entries in the format: [
[ "stream-name" , [
[ "id" , [ "field1" , "value1" , "field2" , "value2" , ... ]],
...
]]
]
Returns an empty array if no entries are found.
Examples
Read from beginning of stream
// Add some entries first
await redis . xadd ( "events" , "*" , { user: "alice" , action: "login" });
await redis . xadd ( "events" , "*" , { user: "bob" , action: "logout" });
// Read all entries
const result = await redis . xread ( "events" , "0" );
console . log ( result );
// Returns:
// [
// ["events", [
// ["1678901234567-0", ["user", "alice", "action", "login"]],
// ["1678901234568-0", ["user", "bob", "action", "logout"]]
// ]]
// ]
Read entries after a specific ID
const id1 = await redis . xadd ( "events" , "*" , { event: "first" });
const id2 = await redis . xadd ( "events" , "*" , { event: "second" });
const id3 = await redis . xadd ( "events" , "*" , { event: "third" });
// Read entries after id1
const result = await redis . xread ( "events" , id1 );
// Returns only entries with id2 and id3
Limit number of entries
// Add 10 entries
for ( let i = 0 ; i < 10 ; i ++ ) {
await redis . xadd ( "events" , "*" , { index: i });
}
// Read only 5 entries
const result = await redis . xread ( "events" , "0" , { count: 5 });
// Returns only the first 5 entries
Read from multiple streams
// Add entries to multiple streams
await redis . xadd ( "stream1" , "*" , { source: "stream1" , data: "a" });
await redis . xadd ( "stream2" , "*" , { source: "stream2" , data: "b" });
// Read from both streams
const result = await redis . xread (
[ "stream1" , "stream2" ], // Array of keys
[ "0" , "0" ] // Array of starting IDs
);
console . log ( result );
// Returns:
// [
// ["stream1", [["1678901234567-0", ["source", "stream1", "data", "a"]]]],
// ["stream2", [["1678901234568-0", ["source", "stream2", "data", "b"]]]]
// ]
Poll for new entries
let lastId = "0" ;
// Initial read
const initial = await redis . xread ( "events" , lastId );
if ( initial && initial . length > 0 ) {
const entries = initial [ 0 ][ 1 ] as any [];
if ( entries . length > 0 ) {
lastId = entries [ entries . length - 1 ][ 0 ]; // Update to last seen ID
}
}
// Later, poll for new entries
const newEntries = await redis . xread ( "events" , lastId );
Read only new entries using $
// Add some existing entries
await redis . xadd ( "events" , "*" , { existing: "data" });
// Read only entries added after this point
const futureEntries = await redis . xread ( "events" , "$" );
// Returns: [] (nothing new yet)
// Add new entry
await redis . xadd ( "events" , "*" , { new: "data" });
const newEntries = await redis . xread ( "events" , "$" );
// Still returns [] because $ means "entries newer than when the read started"
Processing stream entries
const result = await redis . xread ( "events" , "0" );
if ( result && result . length > 0 ) {
const [ streamName , entries ] = result [ 0 ] as [ string , any []];
for ( const [ id , fields ] of entries ) {
// Parse field-value pairs
const data : Record < string , string > = {};
for ( let i = 0 ; i < fields . length ; i += 2 ) {
data [ fields [ i ]] = fields [ i + 1 ];
}
console . log ( `Entry ${ id } :` , data );
}
}
Important Notes
Balanced Keys and IDs
When reading from multiple streams, the number of keys must match the number of IDs:
// ✅ Correct - 2 keys, 2 IDs
await redis . xread ([ "stream1" , "stream2" ], [ "0" , "0" ]);
// ✅ Correct - 1 key, 1 ID
await redis . xread ( "stream1" , "0" );
// ❌ Error - 2 keys, 1 ID (unbalanced)
await redis . xread ([ "stream1" , "stream2" ], "0" );
// ❌ Error - 2 keys, 3 IDs (unbalanced)
await redis . xread ([ "stream1" , "stream2" ], [ "0" , "0" , "0" ]);
Entry ID Positioning
XREAD returns entries after the specified ID, not including it:
const id1 = await redis . xadd ( "stream" , "*" , { n: 1 });
const id2 = await redis . xadd ( "stream" , "*" , { n: 2 });
const id3 = await redis . xadd ( "stream" , "*" , { n: 3 });
// Read after id1
const result = await redis . xread ( "stream" , id1 );
// Returns entries with id2 and id3 (NOT id1)
Special ID Values
ID Meaning "0"Start from the beginning "$"Only new entries (none if called once) "<ms>-<seq>"Start after this specific ID
The response format is:
[
[ "stream-name" , [
[ "entry-id" , [ "field1" , "value1" , "field2" , "value2" ]],
[ "entry-id" , [ "field1" , "value1" , "field2" , "value2" ]]
]],
// ... more streams if reading multiple
]
Note that field-value pairs are returned as a flat array, not as an object.
Count Limit
The count option limits entries per stream , not total:
// Reads up to 10 entries from stream1 AND up to 10 from stream2
await redis . xread (
[ "stream1" , "stream2" ],
[ "0" , "0" ],
{ count: 10 }
);
// May return up to 20 total entries (10 per stream)