Overview
The stream persistence system provides durable storage for streaming AI responses with support for:
SQLite-backed storage for streams and chunks
Resume interrupted streams
Watch streams from other processes
Cancel running streams
Cleanup completed streams
Stream persistence is server-only and requires Node.js. It is not available in browser environments.
Core Components
SqliteStreamStore SQLite-backed stream and chunk storage
StreamManager High-level API for stream lifecycle management
StreamStore Abstract base class for custom storage backends
persistedWriter Low-level writer wrapper for stream persistence
Quick Start
import { SqliteStreamStore , StreamManager } from '@deepagents/context' ;
// Create a SQLite store
const store = new SqliteStreamStore ( './streams.db' );
// Create a stream manager
const manager = new StreamManager ({ store });
// Register a new stream
const { stream , created } = await manager . register ( 'stream-123' );
// Persist a stream
await manager . persist ( myReadableStream , 'stream-123' , {
strategy: 'interval' ,
flushSize: 10 ,
});
// Watch a stream from another process
const watchStream = manager . watch ( 'stream-123' );
for await ( const chunk of watchStream ) {
console . log ( chunk );
}
// Cleanup when done
await manager . cleanup ( 'stream-123' );
store . close ();
SqliteStreamStore
SQLite-backed storage for streams and chunks.
Constructor
class SqliteStreamStore extends StreamStore {
constructor ( pathOrDb : string | DatabaseSync )
}
File Path
In-Memory
Existing Connection
import { SqliteStreamStore } from '@deepagents/context' ;
// Create or open database file
const store = new SqliteStreamStore ( './data/streams.db' );
import { SqliteStreamStore } from '@deepagents/context' ;
// In-memory database (testing)
const store = new SqliteStreamStore ( ':memory:' );
import { DatabaseSync } from 'node:sqlite' ;
import { SqliteStreamStore } from '@deepagents/context' ;
const db = new DatabaseSync ( './streams.db' );
const store = new SqliteStreamStore ( db );
Methods
Create a new stream entry: await store . createStream ({
id: 'stream-1' ,
status: 'queued' ,
createdAt: Date . now (),
startedAt: null ,
finishedAt: null ,
cancelRequestedAt: null ,
error: null ,
});
Retrieve a stream by ID: const stream = await store . getStream ( 'stream-1' );
if ( stream ) {
console . log ( stream . status ); // 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'
}
Update stream status: await store . updateStreamStatus ( 'stream-1' , 'running' );
await store . updateStreamStatus ( 'stream-1' , 'completed' );
await store . updateStreamStatus ( 'stream-1' , 'failed' , { error: 'Connection lost' });
Append chunks to a stream: await store . appendChunks ([
{ streamId: 'stream-1' , seq: 0 , data: { type: 'text-delta' , text: 'Hello' }, createdAt: Date . now () },
{ streamId: 'stream-1' , seq: 1 , data: { type: 'text-delta' , text: ' world' }, createdAt: Date . now () },
]);
Retrieve chunks with pagination: // Get first 100 chunks
const chunks = await store . getChunks ( 'stream-1' , 0 , 100 );
// Get next 100 chunks
const nextChunks = await store . getChunks ( 'stream-1' , 100 , 100 );
Delete a stream and all its chunks: await store . deleteStream ( 'stream-1' );
Reopen a terminal stream for retry: const stream = await store . reopenStream ( 'stream-1' );
console . log ( stream . status ); // 'queued'
Only completed, failed, or cancelled streams can be reopened.
Close the database connection: store . close (); // Idempotent
StreamManager
High-level API for managing stream lifecycle.
Constructor
interface StreamManagerOptions {
store : StreamStore ;
watchPolling ?: WatchStreamOptions ;
cancelPolling ?: PersistCancelPollingOptions ;
onPollingEvent ?: ( event : StreamPollingTelemetryEvent ) => void ;
}
const manager = new StreamManager ({
store ,
watchPolling: {
minMs: 25 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
statusCheckEvery: 3 ,
chunkPageSize: 128 ,
},
cancelPolling: {
minMs: 50 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
},
});
register()
Register or retrieve an existing stream:
const { stream , created } = await manager . register ( 'stream-1' );
if ( created ) {
console . log ( 'New stream created' );
} else {
console . log ( 'Stream already exists' );
}
persist()
Persist a ReadableStream with automatic chunking and cancellation detection:
import { createOpenAI } from '@ai-sdk/openai' ;
import { streamText } from 'ai' ;
const openai = createOpenAI ({ apiKey: process . env . OPENAI_API_KEY });
const result = streamText ({
model: openai ( 'gpt-4' ),
prompt: 'Write a story' ,
});
await manager . persist ( result . fullStream , 'stream-1' , {
strategy: 'interval' , // or 'buffered'
flushSize: 10 , // Flush every 10 chunks
cancelPolling: {
minMs: 100 ,
maxMs: 1000 ,
},
onCancelDetected : async ({ streamId , latencyMs }) => {
console . log ( `Stream ${ streamId } cancelled (latency: ${ latencyMs } ms)` );
},
});
Strategy: interval
Strategy: buffered
Flush chunks at regular intervals: await manager . persist ( stream , 'stream-1' , {
strategy: 'interval' ,
flushSize: 10 , // Flush every 10 chunks
});
Buffer all chunks and flush at the end: await manager . persist ( stream , 'stream-1' , {
strategy: 'buffered' , // All chunks flushed together
});
watch()
Watch a stream’s chunks in real-time with adaptive polling:
const watchStream = manager . watch ( 'stream-1' , {
minMs: 25 , // Start polling every 25ms
maxMs: 500 , // Max polling interval 500ms
multiplier: 2 , // Double delay on empty polls
jitterRatio: 0.15 , // Add 15% jitter
statusCheckEvery: 3 , // Check status every 3 polls
chunkPageSize: 128 , // Fetch 128 chunks per poll
});
for await ( const chunk of watchStream ) {
console . log ( chunk );
}
The watch stream automatically closes when:
The stream reaches a terminal status (completed/failed/cancelled)
The stream is deleted
cancel()
Request cancellation of a running stream:
await manager . cancel ( 'stream-1' );
reopen()
Reopen a terminal stream for retry:
try {
await manager . persist ( stream , 'stream-1' );
} catch ( error ) {
console . error ( 'Stream failed, reopening...' );
await manager . reopen ( 'stream-1' );
await manager . persist ( stream , 'stream-1' );
}
cleanup()
Delete a stream and all its chunks:
await manager . cleanup ( 'stream-1' );
Polling Configuration
Adaptive Polling
Both watch and persist operations use adaptive polling with exponential backoff:
interface PollingConfig {
minMs : number ; // Minimum delay between polls
maxMs : number ; // Maximum delay between polls
multiplier : number ; // Delay multiplier on empty polls
jitterRatio : number ; // Random jitter ratio (0-1)
}
How it works:
Start with minMs delay
On empty poll, multiply delay by multiplier
Cap at maxMs
Add random jitter: delay * (1 ± jitterRatio)
On non-empty poll, reset to minMs
Default Configurations
Watch Polling
Cancel Polling
const DEFAULT_WATCH_POLLING = {
minMs: 25 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
statusCheckEvery: 3 , // Check status every 3 polls
chunkPageSize: 128 , // Fetch 128 chunks per page
};
const DEFAULT_CANCEL_POLLING = {
minMs: 50 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
};
Telemetry
Monitor polling behavior with telemetry events:
const manager = new StreamManager ({
store ,
onPollingEvent : ( event ) => {
switch ( event . type ) {
case 'watch:poll' :
console . log ( `Polled from seq ${ event . fromSeq } , got ${ event . chunkCount } chunks` );
break ;
case 'watch:empty' :
console . log ( `Empty poll, waiting ${ event . delayMs } ms` );
break ;
case 'watch:chunks' :
console . log ( `Delivered ${ event . delivered } chunks, last seq ${ event . lastSeq } ` );
break ;
case 'watch:closed' :
console . log ( `Stream closed: ${ event . reason } ` );
break ;
case 'persist:cancel-poll' :
console . log ( `Cancel poll: status= ${ event . status } , delay= ${ event . delayMs } ms` );
break ;
case 'persist:cancel-detected' :
console . log ( `Cancel detected! Latency: ${ event . latencyMs } ms` );
break ;
}
},
});
Stream Status
Streams progress through these statuses:
type StreamStatus =
| 'queued' // Registered but not started
| 'running' // Currently streaming
| 'completed' // Successfully finished
| 'failed' // Error occurred
| 'cancelled' ; // Cancelled by request
Status Transitions
queued
Initial state after register()
running
Transitions when persist() starts
Terminal States
completed : Stream finished successfully
failed : Error occurred (includes error message)
cancelled : User called cancel()
Use Cases
Long-Running Tasks
Multi-Process
Resume Interrupted
User Cancellation
Persist long-running AI responses for reliability: // Start generation
const { stream } = await manager . register ( 'report-gen-1' );
// Persist in background
manager . persist ( generationStream , 'report-gen-1' )
. catch ( async ( error ) => {
console . error ( 'Generation failed:' , error );
// Retry
await manager . reopen ( 'report-gen-1' );
await manager . persist ( retryStream , 'report-gen-1' );
});
// Watch from UI
const watchStream = manager . watch ( 'report-gen-1' );
for await ( const chunk of watchStream ) {
updateUI ( chunk );
}
Stream from one process, watch from another: // Process A: Producer
const manager = new StreamManager ({ store });
await manager . persist ( aiStream , 'task-1' );
// Process B: Consumer
const manager2 = new StreamManager ({ store });
const watchStream = manager2 . watch ( 'task-1' );
for await ( const chunk of watchStream ) {
console . log ( chunk );
}
Resume streams after crashes: // Check for interrupted streams on startup
const stream = await store . getStream ( 'task-1' );
if ( stream ?. status === 'running' ) {
console . log ( 'Found interrupted stream, reopening...' );
await manager . reopen ( 'task-1' );
await manager . persist ( newStream , 'task-1' );
}
Allow users to cancel long operations: // Start operation
const persistPromise = manager . persist ( stream , 'user-request-1' );
// User clicks cancel button
await manager . cancel ( 'user-request-1' );
// Persist detects cancellation and aborts
await persistPromise ;
Best Practices
Clean up database connections: const store = new SqliteStreamStore ( './streams.db' );
try {
// Use store
} finally {
store . close ();
}
Generate unique IDs to avoid conflicts: import { generateId } from 'ai' ;
const streamId = `task- ${ generateId () } ` ;
await manager . persist ( stream , streamId );
Cleanup Completed Streams
Remove old streams to prevent database growth: // After successful completion
await manager . cleanup ( streamId );
// Or implement periodic cleanup
setInterval ( async () => {
const oldStreams = await findCompletedStreamsOlderThan ( 7 * 24 * 60 * 60 * 1000 );
for ( const id of oldStreams ) {
await manager . cleanup ( id );
}
}, 24 * 60 * 60 * 1000 ); // Daily
Handle Cancellation Gracefully
Always handle cancellation in persist: await manager . persist ( stream , streamId , {
onCancelDetected : async ({ streamId , latencyMs }) => {
console . log ( `Cancelled: ${ streamId } ( ${ latencyMs } ms latency)` );
// Clean up resources
await releaseResources ();
},
});
Next Steps
Messages Learn about message fragments and reminders
Fragment Builders Complete fragment builder API reference
StreamStore API Complete StreamStore API reference
Renderers Transform fragments into different formats