Skip to main content

Overview

The stream persistence system provides durable storage for streaming AI responses with support for:
  • SQLite-backed storage for streams and chunks
  • Resume interrupted streams
  • Watch streams from other processes
  • Cancel running streams
  • Cleanup completed streams
Stream persistence is server-only and requires Node.js. It is not available in browser environments.

Core Components

SqliteStreamStore

SQLite-backed stream and chunk storage

StreamManager

High-level API for stream lifecycle management

StreamStore

Abstract base class for custom storage backends

persistedWriter

Low-level writer wrapper for stream persistence

Quick Start

import { SqliteStreamStore, StreamManager } from '@deepagents/context';

// Create a SQLite store
const store = new SqliteStreamStore('./streams.db');

// Create a stream manager
const manager = new StreamManager({ store });

// Register a new stream
const { stream, created } = await manager.register('stream-123');

// Persist a stream
await manager.persist(myReadableStream, 'stream-123', {
  strategy: 'interval',
  flushSize: 10,
});

// Watch a stream from another process
const watchStream = manager.watch('stream-123');
for await (const chunk of watchStream) {
  console.log(chunk);
}

// Cleanup when done
await manager.cleanup('stream-123');
store.close();

SqliteStreamStore

SQLite-backed storage for streams and chunks.

Constructor

class SqliteStreamStore extends StreamStore {
  constructor(pathOrDb: string | DatabaseSync)
}
import { SqliteStreamStore } from '@deepagents/context';

// Create or open database file
const store = new SqliteStreamStore('./data/streams.db');

Methods

Create a new stream entry:
await store.createStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});
Retrieve a stream by ID:
const stream = await store.getStream('stream-1');
if (stream) {
  console.log(stream.status); // 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'
}
Update stream status:
await store.updateStreamStatus('stream-1', 'running');
await store.updateStreamStatus('stream-1', 'completed');
await store.updateStreamStatus('stream-1', 'failed', { error: 'Connection lost' });
Append chunks to a stream:
await store.appendChunks([
  { streamId: 'stream-1', seq: 0, data: { type: 'text-delta', text: 'Hello' }, createdAt: Date.now() },
  { streamId: 'stream-1', seq: 1, data: { type: 'text-delta', text: ' world' }, createdAt: Date.now() },
]);
Retrieve chunks with pagination:
// Get first 100 chunks
const chunks = await store.getChunks('stream-1', 0, 100);

// Get next 100 chunks
const nextChunks = await store.getChunks('stream-1', 100, 100);
Delete a stream and all its chunks:
await store.deleteStream('stream-1');
Reopen a terminal stream for retry:
const stream = await store.reopenStream('stream-1');
console.log(stream.status); // 'queued'
Only completed, failed, or cancelled streams can be reopened.
Close the database connection:
store.close(); // Idempotent

StreamManager

High-level API for managing stream lifecycle.

Constructor

interface StreamManagerOptions {
  store: StreamStore;
  watchPolling?: WatchStreamOptions;
  cancelPolling?: PersistCancelPollingOptions;
  onPollingEvent?: (event: StreamPollingTelemetryEvent) => void;
}

const manager = new StreamManager({
  store,
  watchPolling: {
    minMs: 25,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
    statusCheckEvery: 3,
    chunkPageSize: 128,
  },
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
});

register()

Register or retrieve an existing stream:
const { stream, created } = await manager.register('stream-1');

if (created) {
  console.log('New stream created');
} else {
  console.log('Stream already exists');
}

persist()

Persist a ReadableStream with automatic chunking and cancellation detection:
import { createOpenAI } from '@ai-sdk/openai';
import { streamText } from 'ai';

const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY });

const result = streamText({
  model: openai('gpt-4'),
  prompt: 'Write a story',
});

await manager.persist(result.fullStream, 'stream-1', {
  strategy: 'interval',  // or 'buffered'
  flushSize: 10,         // Flush every 10 chunks
  cancelPolling: {
    minMs: 100,
    maxMs: 1000,
  },
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Stream ${streamId} cancelled (latency: ${latencyMs}ms)`);
  },
});
Flush chunks at regular intervals:
await manager.persist(stream, 'stream-1', {
  strategy: 'interval',
  flushSize: 10,  // Flush every 10 chunks
});

watch()

Watch a stream’s chunks in real-time with adaptive polling:
const watchStream = manager.watch('stream-1', {
  minMs: 25,           // Start polling every 25ms
  maxMs: 500,          // Max polling interval 500ms
  multiplier: 2,       // Double delay on empty polls
  jitterRatio: 0.15,   // Add 15% jitter
  statusCheckEvery: 3, // Check status every 3 polls
  chunkPageSize: 128,  // Fetch 128 chunks per poll
});

for await (const chunk of watchStream) {
  console.log(chunk);
}
The watch stream automatically closes when:
  • The stream reaches a terminal status (completed/failed/cancelled)
  • The stream is deleted

cancel()

Request cancellation of a running stream:
await manager.cancel('stream-1');

reopen()

Reopen a terminal stream for retry:
try {
  await manager.persist(stream, 'stream-1');
} catch (error) {
  console.error('Stream failed, reopening...');
  await manager.reopen('stream-1');
  await manager.persist(stream, 'stream-1');
}

cleanup()

Delete a stream and all its chunks:
await manager.cleanup('stream-1');

Polling Configuration

Adaptive Polling

Both watch and persist operations use adaptive polling with exponential backoff:
interface PollingConfig {
  minMs: number;        // Minimum delay between polls
  maxMs: number;        // Maximum delay between polls
  multiplier: number;   // Delay multiplier on empty polls
  jitterRatio: number;  // Random jitter ratio (0-1)
}
How it works:
  1. Start with minMs delay
  2. On empty poll, multiply delay by multiplier
  3. Cap at maxMs
  4. Add random jitter: delay * (1 ± jitterRatio)
  5. On non-empty poll, reset to minMs

Default Configurations

const DEFAULT_WATCH_POLLING = {
  minMs: 25,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
  statusCheckEvery: 3,  // Check status every 3 polls
  chunkPageSize: 128,   // Fetch 128 chunks per page
};

Telemetry

Monitor polling behavior with telemetry events:
const manager = new StreamManager({
  store,
  onPollingEvent: (event) => {
    switch (event.type) {
      case 'watch:poll':
        console.log(`Polled from seq ${event.fromSeq}, got ${event.chunkCount} chunks`);
        break;
      case 'watch:empty':
        console.log(`Empty poll, waiting ${event.delayMs}ms`);
        break;
      case 'watch:chunks':
        console.log(`Delivered ${event.delivered} chunks, last seq ${event.lastSeq}`);
        break;
      case 'watch:closed':
        console.log(`Stream closed: ${event.reason}`);
        break;
      case 'persist:cancel-poll':
        console.log(`Cancel poll: status=${event.status}, delay=${event.delayMs}ms`);
        break;
      case 'persist:cancel-detected':
        console.log(`Cancel detected! Latency: ${event.latencyMs}ms`);
        break;
    }
  },
});

Stream Status

Streams progress through these statuses:
type StreamStatus = 
  | 'queued'      // Registered but not started
  | 'running'     // Currently streaming
  | 'completed'   // Successfully finished
  | 'failed'      // Error occurred
  | 'cancelled';  // Cancelled by request

Status Transitions

1

queued

Initial state after register()
2

running

Transitions when persist() starts
3

Terminal States

  • completed: Stream finished successfully
  • failed: Error occurred (includes error message)
  • cancelled: User called cancel()

Use Cases

Persist long-running AI responses for reliability:
// Start generation
const { stream } = await manager.register('report-gen-1');

// Persist in background
manager.persist(generationStream, 'report-gen-1')
  .catch(async (error) => {
    console.error('Generation failed:', error);
    // Retry
    await manager.reopen('report-gen-1');
    await manager.persist(retryStream, 'report-gen-1');
  });

// Watch from UI
const watchStream = manager.watch('report-gen-1');
for await (const chunk of watchStream) {
  updateUI(chunk);
}

Best Practices

Clean up database connections:
const store = new SqliteStreamStore('./streams.db');
try {
  // Use store
} finally {
  store.close();
}
Generate unique IDs to avoid conflicts:
import { generateId } from 'ai';

const streamId = `task-${generateId()}`;
await manager.persist(stream, streamId);
Remove old streams to prevent database growth:
// After successful completion
await manager.cleanup(streamId);

// Or implement periodic cleanup
setInterval(async () => {
  const oldStreams = await findCompletedStreamsOlderThan(7 * 24 * 60 * 60 * 1000);
  for (const id of oldStreams) {
    await manager.cleanup(id);
  }
}, 24 * 60 * 60 * 1000); // Daily
Always handle cancellation in persist:
await manager.persist(stream, streamId, {
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Cancelled: ${streamId} (${latencyMs}ms latency)`);
    // Clean up resources
    await releaseResources();
  },
});

Next Steps

Messages

Learn about message fragments and reminders

Fragment Builders

Complete fragment builder API reference

StreamStore API

Complete StreamStore API reference

Renderers

Transform fragments into different formats

Build docs developers (and LLMs) love