Skip to main content

Overview

The stream persistence API provides durable storage for streaming AI responses with SQLite backend and high-level lifecycle management.
Stream persistence is server-only and requires Node.js. Not available in browser environments.

StreamStore (Abstract)

Base class for stream storage implementations.
abstract class StreamStore {
  abstract createStream(stream: StreamData): Promise<void>;
  
  abstract upsertStream(
    stream: StreamData
  ): Promise<{ stream: StreamData; created: boolean }>;
  
  abstract getStream(streamId: string): Promise<StreamData | undefined>;
  
  abstract getStreamStatus(
    streamId: string
  ): Promise<StreamStatus | undefined>;
  
  abstract updateStreamStatus(
    streamId: string,
    status: StreamStatus,
    options?: { error?: string }
  ): Promise<void>;
  
  abstract appendChunks(chunks: StreamChunkData[]): Promise<void>;
  
  abstract getChunks(
    streamId: string,
    fromSeq?: number,
    limit?: number
  ): Promise<StreamChunkData[]>;
  
  abstract deleteStream(streamId: string): Promise<void>;
  
  abstract reopenStream(streamId: string): Promise<StreamData>;
}

Types

StreamStatus

type StreamStatus =
  | 'queued'      // Registered but not started
  | 'running'     // Currently streaming
  | 'completed'   // Successfully finished
  | 'failed'      // Error occurred
  | 'cancelled';  // Cancelled by request

StreamData

interface StreamData {
  id: string;                       // Unique stream identifier
  status: StreamStatus;             // Current status
  createdAt: number;                // Unix timestamp (ms)
  startedAt: number | null;         // When streaming began
  finishedAt: number | null;        // When streaming ended
  cancelRequestedAt: number | null; // When cancel was requested
  error: string | null;             // Error message if failed
}

StreamChunkData

interface StreamChunkData {
  streamId: string;  // Stream identifier
  seq: number;       // Sequential chunk number (0-based)
  data: unknown;     // Chunk content (JSON-serializable)
  createdAt: number; // Unix timestamp (ms)
}

SqliteStreamStore

SQLite-backed implementation of StreamStore.

Constructor

class SqliteStreamStore extends StreamStore {
  constructor(pathOrDb: string | DatabaseSync)
}
Parameters:
  • pathOrDb - File path to SQLite database or existing DatabaseSync instance
Example:
import { SqliteStreamStore } from '@deepagents/context';

// File-based
const store = new SqliteStreamStore('./data/streams.db');

// In-memory (testing)
const memStore = new SqliteStreamStore(':memory:');

// Existing connection
import { DatabaseSync } from 'node:sqlite';
const db = new DatabaseSync('./streams.db');
const store = new SqliteStreamStore(db);

createStream()

Create a new stream entry.
await createStream(stream: StreamData): Promise<void>
Parameters:
  • stream - Stream metadata
Throws:
  • Error if stream ID already exists
Example:
await store.createStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});

upsertStream()

Create or retrieve existing stream.
await upsertStream(
  stream: StreamData
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • stream - Stream metadata
Returns:
  • stream - The stream (new or existing)
  • created - True if newly created, false if already existed
Example:
const { stream, created } = await store.upsertStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});

if (created) {
  console.log('New stream created');
} else {
  console.log('Stream already exists:', stream.status);
}

getStream()

Retrieve stream by ID.
await getStream(streamId: string): Promise<StreamData | undefined>
Parameters:
  • streamId - Stream identifier
Returns:
  • Stream data or undefined if not found
Example:
const stream = await store.getStream('stream-1');
if (stream) {
  console.log(`Status: ${stream.status}`);
  console.log(`Created: ${new Date(stream.createdAt)}`);
}

getStreamStatus()

Retrieve only the stream status.
await getStreamStatus(streamId: string): Promise<StreamStatus | undefined>
Parameters:
  • streamId - Stream identifier
Returns:
  • Stream status or undefined if not found
Example:
const status = await store.getStreamStatus('stream-1');
if (status === 'completed') {
  console.log('Stream finished successfully');
}

updateStreamStatus()

Update stream status with automatic timestamp management.
await updateStreamStatus(
  streamId: string,
  status: StreamStatus,
  options?: { error?: string }
): Promise<void>
Parameters:
  • streamId - Stream identifier
  • status - New status
  • options - Additional options
    • error - Error message (for ‘failed’ status)
Behavior:
  • 'running' - Sets startedAt to current time
  • 'completed' - Sets finishedAt to current time
  • 'failed' - Sets finishedAt and error
  • 'cancelled' - Sets cancelRequestedAt and finishedAt
Example:
// Start streaming
await store.updateStreamStatus('stream-1', 'running');

// Complete successfully
await store.updateStreamStatus('stream-1', 'completed');

// Fail with error
await store.updateStreamStatus('stream-1', 'failed', {
  error: 'Connection timeout',
});

// Cancel
await store.updateStreamStatus('stream-1', 'cancelled');

appendChunks()

Append chunks to a stream (atomic batch operation).
await appendChunks(chunks: StreamChunkData[]): Promise<void>
Parameters:
  • chunks - Array of chunks to append
Behavior:
  • Uses transaction for atomicity
  • All chunks succeed or all fail
  • Chunks serialized as JSON
Example:
await store.appendChunks([
  {
    streamId: 'stream-1',
    seq: 0,
    data: { type: 'text-delta', text: 'Hello' },
    createdAt: Date.now(),
  },
  {
    streamId: 'stream-1',
    seq: 1,
    data: { type: 'text-delta', text: ' world' },
    createdAt: Date.now(),
  },
]);

getChunks()

Retrieve chunks with pagination.
await getChunks(
  streamId: string,
  fromSeq?: number,
  limit?: number
): Promise<StreamChunkData[]>
Parameters:
  • streamId - Stream identifier
  • fromSeq - (Optional) Start from this sequence number (inclusive)
  • limit - (Optional) Maximum number of chunks to return
Returns:
  • Array of chunks ordered by sequence number
Example:
// Get first 100 chunks
const chunks = await store.getChunks('stream-1', 0, 100);

// Get next 100 chunks
const nextChunks = await store.getChunks('stream-1', 100, 100);

// Get all chunks from sequence 50
const remaining = await store.getChunks('stream-1', 50);

// Get all chunks
const allChunks = await store.getChunks('stream-1');

deleteStream()

Delete a stream and all its chunks.
await deleteStream(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Behavior:
  • Deletes stream metadata
  • Deletes all associated chunks (cascade)
Example:
await store.deleteStream('stream-1');

reopenStream()

Reopen a terminal stream for retry.
await reopenStream(streamId: string): Promise<StreamData>
Parameters:
  • streamId - Stream identifier
Returns:
  • New stream data with ‘queued’ status
Throws:
  • Error if stream not found
  • Error if stream not in terminal state
Behavior:
  • Only works on ‘completed’, ‘failed’, or ‘cancelled’ streams
  • Deletes existing stream and chunks
  • Creates new stream with same ID
  • Resets all timestamps
Example:
try {
  await manager.persist(stream, 'stream-1');
} catch (error) {
  console.error('Stream failed, reopening...');
  const newStream = await store.reopenStream('stream-1');
  console.log('Reopened:', newStream.status); // 'queued'
}

close()

Close the database connection.
close(): void
Behavior:
  • Closes SQLite connection
  • Idempotent (safe to call multiple times)
  • Clears prepared statement cache
Example:
const store = new SqliteStreamStore('./streams.db');
try {
  // Use store
} finally {
  store.close();
}

StreamManager

High-level API for managing stream lifecycle.

Constructor

interface StreamManagerOptions {
  store: StreamStore;
  watchPolling?: WatchStreamOptions;
  cancelPolling?: PersistCancelPollingOptions;
  onPollingEvent?: (event: StreamPollingTelemetryEvent) => void;
}

class StreamManager {
  constructor(options: StreamManagerOptions)
}
Parameters:
  • store - Stream store implementation
  • watchPolling - (Optional) Watch polling configuration
  • cancelPolling - (Optional) Cancel polling configuration
  • onPollingEvent - (Optional) Telemetry callback
Example:
import { SqliteStreamStore, StreamManager } from '@deepagents/context';

const store = new SqliteStreamStore('./streams.db');
const manager = new StreamManager({
  store,
  watchPolling: {
    minMs: 25,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
    statusCheckEvery: 3,
    chunkPageSize: 128,
  },
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
  onPollingEvent: (event) => {
    console.log('Polling event:', event);
  },
});

store (property)

Access the underlying store.
get store(): StreamStore
Example:
const stream = await manager.store.getStream('stream-1');

register()

Register or retrieve an existing stream.
await register(
  streamId: string
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • streamId - Stream identifier
Returns:
  • stream - Stream data
  • created - True if newly created
Example:
const { stream, created } = await manager.register('stream-1');
if (created) {
  console.log('New stream registered');
}

persist()

Persist a ReadableStream with automatic chunking and cancellation detection.
await persist(
  stream: ReadableStream,
  streamId: string,
  options?: PersistStreamOptions
): Promise<{ streamId: string }>
Parameters:
  • stream - ReadableStream to persist
  • streamId - Stream identifier
  • options - (Optional) Persistence options
Options:
interface PersistStreamOptions {
  strategy?: 'interval' | 'buffered';  // Flush strategy
  flushSize?: number;                   // Chunks per flush (interval mode)
  cancelPolling?: PersistCancelPollingOptions;
  onCancelDetected?: (info: {
    streamId: string;
    latencyMs: number | null;
  }) => void | Promise<void>;
}
Returns:
  • streamId - The stream identifier
Behavior:
  • Updates status to ‘running’
  • Polls for cancellation in background
  • Persists chunks according to strategy
  • Updates status to ‘completed’ or ‘failed’
  • Aborts on cancellation
Example:
import { streamText } from 'ai';
import { createOpenAI } from '@ai-sdk/openai';

const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY });
const result = streamText({
  model: openai('gpt-4'),
  prompt: 'Write a story',
});

try {
  await manager.persist(result.fullStream, 'story-1', {
    strategy: 'interval',
    flushSize: 10,
    onCancelDetected: async ({ streamId, latencyMs }) => {
      console.log(`Cancelled: ${streamId} (${latencyMs}ms latency)`);
      await cleanup(streamId);
    },
  });
  console.log('Stream completed');
} catch (error) {
  console.error('Stream failed:', error);
}

watch()

Watch a stream’s chunks in real-time.
watch(
  streamId: string,
  options?: WatchStreamOptions
): ReadableStream<StreamPart>
Parameters:
  • streamId - Stream identifier
  • options - (Optional) Watch options
Options:
interface WatchStreamOptions {
  minMs?: number;           // Min polling delay (default: 25)
  maxMs?: number;           // Max polling delay (default: 500)
  multiplier?: number;      // Delay multiplier (default: 2)
  jitterRatio?: number;     // Jitter ratio (default: 0.15)
  statusCheckEvery?: number; // Status check frequency (default: 3)
  chunkPageSize?: number;   // Chunks per page (default: 128)
}
Returns:
  • ReadableStream of stream chunks
Behavior:
  • Adaptive polling with exponential backoff
  • Automatically closes on terminal status
  • Resets polling delay on new chunks
Example:
const watchStream = manager.watch('stream-1', {
  minMs: 25,
  maxMs: 500,
  chunkPageSize: 128,
});

for await (const chunk of watchStream) {
  console.log('Chunk:', chunk);
}

console.log('Stream finished');

cancel()

Request cancellation of a running stream.
await cancel(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Behavior:
  • Updates status to ‘cancelled’
  • Sets cancelRequestedAt timestamp
  • Persist operation detects and aborts
Example:
// Start operation
const persistPromise = manager.persist(stream, 'long-task');

// User clicks cancel
await manager.cancel('long-task');

// Persist detects cancellation and stops
await persistPromise;

reopen()

Reopen a terminal stream for retry.
await reopen(
  streamId: string
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • streamId - Stream identifier
Returns:
  • stream - New stream data
  • created - Always true
Throws:
  • Error if stream not in terminal state
Example:
try {
  await manager.persist(stream, 'task-1');
} catch (error) {
  console.error('Failed, reopening...');
  await manager.reopen('task-1');
  await manager.persist(retryStream, 'task-1');
}

cleanup()

Delete a stream and all its chunks.
await cleanup(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Example:
// After successful completion
await manager.cleanup('stream-1');

Polling Configuration

WatchPollingConfig

interface WatchPollingConfig {
  minMs: number;           // Minimum delay between polls
  maxMs: number;           // Maximum delay between polls
  multiplier: number;      // Delay multiplier on empty polls
  jitterRatio: number;     // Random jitter ratio (0-1)
  statusCheckEvery: number; // Check status every N polls
  chunkPageSize: number;   // Chunks to fetch per poll
}
Default:
const DEFAULT_WATCH_POLLING = {
  minMs: 25,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
  statusCheckEvery: 3,
  chunkPageSize: 128,
};

CancelPollingConfig

interface CancelPollingConfig {
  minMs: number;        // Minimum delay between polls
  maxMs: number;        // Maximum delay between polls
  multiplier: number;   // Delay multiplier on empty polls
  jitterRatio: number;  // Random jitter ratio (0-1)
}
Default:
const DEFAULT_CANCEL_POLLING = {
  minMs: 50,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
};

Telemetry Events

StreamPollingTelemetryEvent

type StreamPollingTelemetryEvent =
  | {
      type: 'watch:poll';
      streamId: string;
      fromSeq: number;
      chunkCount: number;
      statusChecked: boolean;
    }
  | {
      type: 'watch:empty';
      streamId: string;
      fromSeq: number;
      delayMs: number;
    }
  | {
      type: 'watch:chunks';
      streamId: string;
      delivered: number;
      lastSeq: number;
    }
  | {
      type: 'watch:closed';
      streamId: string;
      reason: 'terminal' | 'missing';
    }
  | {
      type: 'persist:cancel-poll';
      streamId: string;
      delayMs: number;
      status: StreamStatus | 'missing';
    }
  | {
      type: 'persist:cancel-detected';
      streamId: string;
      latencyMs: number | null;
    };
Example:
const manager = new StreamManager({
  store,
  onPollingEvent: (event) => {
    switch (event.type) {
      case 'watch:poll':
        console.log(
          `Polled ${event.chunkCount} chunks from seq ${event.fromSeq}`
        );
        break;
      case 'watch:empty':
        console.log(`Empty poll, waiting ${event.delayMs}ms`);
        break;
      case 'watch:chunks':
        console.log(
          `Delivered ${event.delivered} chunks, last=${event.lastSeq}`
        );
        break;
      case 'watch:closed':
        console.log(`Stream closed: ${event.reason}`);
        break;
      case 'persist:cancel-detected':
        console.log(`Cancel detected! Latency: ${event.latencyMs}ms`);
        break;
    }
  },
});

Best Practices

const store = new SqliteStreamStore('./streams.db');
try {
  const manager = new StreamManager({ store });
  // Use manager
} finally {
  store.close();
}
import { generateId } from 'ai';

const streamId = `task-${generateId()}`;
await manager.persist(stream, streamId);
// After success
await manager.cleanup(streamId);

// Or periodic cleanup
setInterval(async () => {
  const oldStreams = await findOldCompletedStreams();
  for (const id of oldStreams) {
    await manager.cleanup(id);
  }
}, 24 * 60 * 60 * 1000);
await manager.persist(stream, streamId, {
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Cancelled: ${streamId}`);
    await releaseResources();
  },
});

Next Steps

Stream Persistence Guide

Learn about using stream persistence

Fragment Builders

Complete fragment builder API reference

Renderers API

Complete renderer API reference

Messages

Learn about message fragments

Build docs developers (and LLMs) love