Skip to main content

Usage

import { Redis } from "@upstash/redis";

const redis = new Redis({
  url: "<UPSTASH_REDIS_URL>",
  token: "<UPSTASH_REDIS_TOKEN>",
});

const entries = await redis.xread("events", "0");

Parameters

key
string | string[]
required
The stream key(s) to read from. Can be a single key or an array of keys.
id
string | string[]
required
The starting ID(s) for each stream. Must match the number of keys:
  • "0" - Read from the beginning of the stream
  • "<ms>-<seq>" - Read entries after this specific ID
  • "$" - Read only new entries (commonly used with BLOCK)
If key is an array, id must also be an array with the same length.
options
object
Optional reading options:

Response

result
unknown[]
An array of stream data. The exact structure depends on the streams read.For single stream reads, returns entries in the format:
[
  ["stream-name", [
    ["id", ["field1", "value1", "field2", "value2", ...]],
    ...
  ]]
]
Returns an empty array if no entries are found.

Examples

Read from beginning of stream

// Add some entries first
await redis.xadd("events", "*", { user: "alice", action: "login" });
await redis.xadd("events", "*", { user: "bob", action: "logout" });

// Read all entries
const result = await redis.xread("events", "0");
console.log(result);
// Returns:
// [
//   ["events", [
//     ["1678901234567-0", ["user", "alice", "action", "login"]],
//     ["1678901234568-0", ["user", "bob", "action", "logout"]]
//   ]]
// ]

Read entries after a specific ID

const id1 = await redis.xadd("events", "*", { event: "first" });
const id2 = await redis.xadd("events", "*", { event: "second" });
const id3 = await redis.xadd("events", "*", { event: "third" });

// Read entries after id1
const result = await redis.xread("events", id1);
// Returns only entries with id2 and id3

Limit number of entries

// Add 10 entries
for (let i = 0; i < 10; i++) {
  await redis.xadd("events", "*", { index: i });
}

// Read only 5 entries
const result = await redis.xread("events", "0", { count: 5 });
// Returns only the first 5 entries

Read from multiple streams

// Add entries to multiple streams
await redis.xadd("stream1", "*", { source: "stream1", data: "a" });
await redis.xadd("stream2", "*", { source: "stream2", data: "b" });

// Read from both streams
const result = await redis.xread(
  ["stream1", "stream2"],  // Array of keys
  ["0", "0"]               // Array of starting IDs
);

console.log(result);
// Returns:
// [
//   ["stream1", [["1678901234567-0", ["source", "stream1", "data", "a"]]]],
//   ["stream2", [["1678901234568-0", ["source", "stream2", "data", "b"]]]]
// ]

Poll for new entries

let lastId = "0";

// Initial read
const initial = await redis.xread("events", lastId);

if (initial && initial.length > 0) {
  const entries = initial[0][1] as any[];
  if (entries.length > 0) {
    lastId = entries[entries.length - 1][0]; // Update to last seen ID
  }
}

// Later, poll for new entries
const newEntries = await redis.xread("events", lastId);

Read only new entries using $

// Add some existing entries
await redis.xadd("events", "*", { existing: "data" });

// Read only entries added after this point
const futureEntries = await redis.xread("events", "$");
// Returns: [] (nothing new yet)

// Add new entry
await redis.xadd("events", "*", { new: "data" });

const newEntries = await redis.xread("events", "$");
// Still returns [] because $ means "entries newer than when the read started"

Processing stream entries

const result = await redis.xread("events", "0");

if (result && result.length > 0) {
  const [streamName, entries] = result[0] as [string, any[]];
  
  for (const [id, fields] of entries) {
    // Parse field-value pairs
    const data: Record<string, string> = {};
    for (let i = 0; i < fields.length; i += 2) {
      data[fields[i]] = fields[i + 1];
    }
    
    console.log(`Entry ${id}:`, data);
  }
}

Important Notes

Balanced Keys and IDs

When reading from multiple streams, the number of keys must match the number of IDs:
// ✅ Correct - 2 keys, 2 IDs
await redis.xread(["stream1", "stream2"], ["0", "0"]);

// ✅ Correct - 1 key, 1 ID
await redis.xread("stream1", "0");

// ❌ Error - 2 keys, 1 ID (unbalanced)
await redis.xread(["stream1", "stream2"], "0");

// ❌ Error - 2 keys, 3 IDs (unbalanced)
await redis.xread(["stream1", "stream2"], ["0", "0", "0"]);

Entry ID Positioning

XREAD returns entries after the specified ID, not including it:
const id1 = await redis.xadd("stream", "*", { n: 1 });
const id2 = await redis.xadd("stream", "*", { n: 2 });
const id3 = await redis.xadd("stream", "*", { n: 3 });

// Read after id1
const result = await redis.xread("stream", id1);
// Returns entries with id2 and id3 (NOT id1)

Special ID Values

IDMeaning
"0"Start from the beginning
"$"Only new entries (none if called once)
"<ms>-<seq>"Start after this specific ID

Return Format

The response format is:
[
  ["stream-name", [
    ["entry-id", ["field1", "value1", "field2", "value2"]],
    ["entry-id", ["field1", "value1", "field2", "value2"]]
  ]],
  // ... more streams if reading multiple
]
Note that field-value pairs are returned as a flat array, not as an object.

Count Limit

The count option limits entries per stream, not total:
// Reads up to 10 entries from stream1 AND up to 10 from stream2
await redis.xread(
  ["stream1", "stream2"],
  ["0", "0"],
  { count: 10 }
);
// May return up to 20 total entries (10 per stream)

Build docs developers (and LLMs) love