Overview
The stream persistence API provides durable storage for streaming AI responses with SQLite backend and high-level lifecycle management.
Stream persistence is server-only and requires Node.js. Not available in browser environments.
StreamStore (Abstract)
Base class for stream storage implementations.
abstract class StreamStore {
abstract createStream ( stream : StreamData ) : Promise < void >;
abstract upsertStream (
stream : StreamData
) : Promise <{ stream : StreamData ; created : boolean }>;
abstract getStream ( streamId : string ) : Promise < StreamData | undefined >;
abstract getStreamStatus (
streamId : string
) : Promise < StreamStatus | undefined >;
abstract updateStreamStatus (
streamId : string ,
status : StreamStatus ,
options ?: { error ?: string }
) : Promise < void >;
abstract appendChunks ( chunks : StreamChunkData []) : Promise < void >;
abstract getChunks (
streamId : string ,
fromSeq ?: number ,
limit ?: number
) : Promise < StreamChunkData []>;
abstract deleteStream ( streamId : string ) : Promise < void >;
abstract reopenStream ( streamId : string ) : Promise < StreamData >;
}
Types
StreamStatus
type StreamStatus =
| 'queued' // Registered but not started
| 'running' // Currently streaming
| 'completed' // Successfully finished
| 'failed' // Error occurred
| 'cancelled' ; // Cancelled by request
StreamData
interface StreamData {
id : string ; // Unique stream identifier
status : StreamStatus ; // Current status
createdAt : number ; // Unix timestamp (ms)
startedAt : number | null ; // When streaming began
finishedAt : number | null ; // When streaming ended
cancelRequestedAt : number | null ; // When cancel was requested
error : string | null ; // Error message if failed
}
StreamChunkData
interface StreamChunkData {
streamId : string ; // Stream identifier
seq : number ; // Sequential chunk number (0-based)
data : unknown ; // Chunk content (JSON-serializable)
createdAt : number ; // Unix timestamp (ms)
}
SqliteStreamStore
SQLite-backed implementation of StreamStore.
Constructor
class SqliteStreamStore extends StreamStore {
constructor ( pathOrDb : string | DatabaseSync )
}
Parameters:
pathOrDb - File path to SQLite database or existing DatabaseSync instance
Example:
import { SqliteStreamStore } from '@deepagents/context' ;
// File-based
const store = new SqliteStreamStore ( './data/streams.db' );
// In-memory (testing)
const memStore = new SqliteStreamStore ( ':memory:' );
// Existing connection
import { DatabaseSync } from 'node:sqlite' ;
const db = new DatabaseSync ( './streams.db' );
const store = new SqliteStreamStore ( db );
createStream()
Create a new stream entry.
await createStream ( stream : StreamData ): Promise < void >
Parameters:
Throws:
Error if stream ID already exists
Example:
await store . createStream ({
id: 'stream-1' ,
status: 'queued' ,
createdAt: Date . now (),
startedAt: null ,
finishedAt: null ,
cancelRequestedAt: null ,
error: null ,
});
upsertStream()
Create or retrieve existing stream.
await upsertStream (
stream : StreamData
): Promise < { stream : StreamData ; created : boolean } >
Parameters:
Returns:
stream - The stream (new or existing)
created - True if newly created, false if already existed
Example:
const { stream , created } = await store . upsertStream ({
id: 'stream-1' ,
status: 'queued' ,
createdAt: Date . now (),
startedAt: null ,
finishedAt: null ,
cancelRequestedAt: null ,
error: null ,
});
if ( created ) {
console . log ( 'New stream created' );
} else {
console . log ( 'Stream already exists:' , stream . status );
}
getStream()
Retrieve stream by ID.
await getStream ( streamId : string ): Promise < StreamData | undefined >
Parameters:
streamId - Stream identifier
Returns:
Stream data or undefined if not found
Example:
const stream = await store . getStream ( 'stream-1' );
if ( stream ) {
console . log ( `Status: ${ stream . status } ` );
console . log ( `Created: ${ new Date ( stream . createdAt ) } ` );
}
getStreamStatus()
Retrieve only the stream status.
await getStreamStatus ( streamId : string ): Promise < StreamStatus | undefined >
Parameters:
streamId - Stream identifier
Returns:
Stream status or undefined if not found
Example:
const status = await store . getStreamStatus ( 'stream-1' );
if ( status === 'completed' ) {
console . log ( 'Stream finished successfully' );
}
updateStreamStatus()
Update stream status with automatic timestamp management.
await updateStreamStatus (
streamId : string ,
status : StreamStatus ,
options ?: { error? : string }
): Promise < void >
Parameters:
streamId - Stream identifier
status - New status
options - Additional options
error - Error message (for ‘failed’ status)
Behavior:
'running' - Sets startedAt to current time
'completed' - Sets finishedAt to current time
'failed' - Sets finishedAt and error
'cancelled' - Sets cancelRequestedAt and finishedAt
Example:
// Start streaming
await store . updateStreamStatus ( 'stream-1' , 'running' );
// Complete successfully
await store . updateStreamStatus ( 'stream-1' , 'completed' );
// Fail with error
await store . updateStreamStatus ( 'stream-1' , 'failed' , {
error: 'Connection timeout' ,
});
// Cancel
await store . updateStreamStatus ( 'stream-1' , 'cancelled' );
appendChunks()
Append chunks to a stream (atomic batch operation).
await appendChunks ( chunks : StreamChunkData []): Promise < void >
Parameters:
chunks - Array of chunks to append
Behavior:
Uses transaction for atomicity
All chunks succeed or all fail
Chunks serialized as JSON
Example:
await store . appendChunks ([
{
streamId: 'stream-1' ,
seq: 0 ,
data: { type: 'text-delta' , text: 'Hello' },
createdAt: Date . now (),
},
{
streamId: 'stream-1' ,
seq: 1 ,
data: { type: 'text-delta' , text: ' world' },
createdAt: Date . now (),
},
]);
getChunks()
Retrieve chunks with pagination.
await getChunks (
streamId : string ,
fromSeq ?: number ,
limit ?: number
): Promise < StreamChunkData [] >
Parameters:
streamId - Stream identifier
fromSeq - (Optional) Start from this sequence number (inclusive)
limit - (Optional) Maximum number of chunks to return
Returns:
Array of chunks ordered by sequence number
Example:
// Get first 100 chunks
const chunks = await store . getChunks ( 'stream-1' , 0 , 100 );
// Get next 100 chunks
const nextChunks = await store . getChunks ( 'stream-1' , 100 , 100 );
// Get all chunks from sequence 50
const remaining = await store . getChunks ( 'stream-1' , 50 );
// Get all chunks
const allChunks = await store . getChunks ( 'stream-1' );
deleteStream()
Delete a stream and all its chunks.
await deleteStream ( streamId : string ): Promise < void >
Parameters:
streamId - Stream identifier
Behavior:
Deletes stream metadata
Deletes all associated chunks (cascade)
Example:
await store . deleteStream ( 'stream-1' );
reopenStream()
Reopen a terminal stream for retry.
await reopenStream ( streamId : string ): Promise < StreamData >
Parameters:
streamId - Stream identifier
Returns:
New stream data with ‘queued’ status
Throws:
Error if stream not found
Error if stream not in terminal state
Behavior:
Only works on ‘completed’, ‘failed’, or ‘cancelled’ streams
Deletes existing stream and chunks
Creates new stream with same ID
Resets all timestamps
Example:
try {
await manager . persist ( stream , 'stream-1' );
} catch ( error ) {
console . error ( 'Stream failed, reopening...' );
const newStream = await store . reopenStream ( 'stream-1' );
console . log ( 'Reopened:' , newStream . status ); // 'queued'
}
close()
Close the database connection.
Behavior:
Closes SQLite connection
Idempotent (safe to call multiple times)
Clears prepared statement cache
Example:
const store = new SqliteStreamStore ( './streams.db' );
try {
// Use store
} finally {
store . close ();
}
StreamManager
High-level API for managing stream lifecycle.
Constructor
interface StreamManagerOptions {
store : StreamStore ;
watchPolling ?: WatchStreamOptions ;
cancelPolling ?: PersistCancelPollingOptions ;
onPollingEvent ?: ( event : StreamPollingTelemetryEvent ) => void ;
}
class StreamManager {
constructor ( options : StreamManagerOptions )
}
Parameters:
store - Stream store implementation
watchPolling - (Optional) Watch polling configuration
cancelPolling - (Optional) Cancel polling configuration
onPollingEvent - (Optional) Telemetry callback
Example:
import { SqliteStreamStore , StreamManager } from '@deepagents/context' ;
const store = new SqliteStreamStore ( './streams.db' );
const manager = new StreamManager ({
store ,
watchPolling: {
minMs: 25 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
statusCheckEvery: 3 ,
chunkPageSize: 128 ,
},
cancelPolling: {
minMs: 50 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
},
onPollingEvent : ( event ) => {
console . log ( 'Polling event:' , event );
},
});
store (property)
Access the underlying store.
Example:
const stream = await manager . store . getStream ( 'stream-1' );
register()
Register or retrieve an existing stream.
await register (
streamId : string
): Promise < { stream : StreamData ; created : boolean } >
Parameters:
streamId - Stream identifier
Returns:
stream - Stream data
created - True if newly created
Example:
const { stream , created } = await manager . register ( 'stream-1' );
if ( created ) {
console . log ( 'New stream registered' );
}
persist()
Persist a ReadableStream with automatic chunking and cancellation detection.
await persist (
stream : ReadableStream ,
streamId : string ,
options ?: PersistStreamOptions
): Promise < { streamId : string } >
Parameters:
stream - ReadableStream to persist
streamId - Stream identifier
options - (Optional) Persistence options
Options:
interface PersistStreamOptions {
strategy ?: 'interval' | 'buffered' ; // Flush strategy
flushSize ?: number ; // Chunks per flush (interval mode)
cancelPolling ?: PersistCancelPollingOptions ;
onCancelDetected ?: ( info : {
streamId : string ;
latencyMs : number | null ;
}) => void | Promise < void >;
}
Returns:
streamId - The stream identifier
Behavior:
Updates status to ‘running’
Polls for cancellation in background
Persists chunks according to strategy
Updates status to ‘completed’ or ‘failed’
Aborts on cancellation
Example:
import { streamText } from 'ai' ;
import { createOpenAI } from '@ai-sdk/openai' ;
const openai = createOpenAI ({ apiKey: process . env . OPENAI_API_KEY });
const result = streamText ({
model: openai ( 'gpt-4' ),
prompt: 'Write a story' ,
});
try {
await manager . persist ( result . fullStream , 'story-1' , {
strategy: 'interval' ,
flushSize: 10 ,
onCancelDetected : async ({ streamId , latencyMs }) => {
console . log ( `Cancelled: ${ streamId } ( ${ latencyMs } ms latency)` );
await cleanup ( streamId );
},
});
console . log ( 'Stream completed' );
} catch ( error ) {
console . error ( 'Stream failed:' , error );
}
watch()
Watch a stream’s chunks in real-time.
watch (
streamId : string ,
options ?: WatchStreamOptions
): ReadableStream < StreamPart >
Parameters:
streamId - Stream identifier
options - (Optional) Watch options
Options:
interface WatchStreamOptions {
minMs ?: number ; // Min polling delay (default: 25)
maxMs ?: number ; // Max polling delay (default: 500)
multiplier ?: number ; // Delay multiplier (default: 2)
jitterRatio ?: number ; // Jitter ratio (default: 0.15)
statusCheckEvery ?: number ; // Status check frequency (default: 3)
chunkPageSize ?: number ; // Chunks per page (default: 128)
}
Returns:
ReadableStream of stream chunks
Behavior:
Adaptive polling with exponential backoff
Automatically closes on terminal status
Resets polling delay on new chunks
Example:
const watchStream = manager . watch ( 'stream-1' , {
minMs: 25 ,
maxMs: 500 ,
chunkPageSize: 128 ,
});
for await ( const chunk of watchStream ) {
console . log ( 'Chunk:' , chunk );
}
console . log ( 'Stream finished' );
cancel()
Request cancellation of a running stream.
await cancel ( streamId : string ): Promise < void >
Parameters:
streamId - Stream identifier
Behavior:
Updates status to ‘cancelled’
Sets cancelRequestedAt timestamp
Persist operation detects and aborts
Example:
// Start operation
const persistPromise = manager . persist ( stream , 'long-task' );
// User clicks cancel
await manager . cancel ( 'long-task' );
// Persist detects cancellation and stops
await persistPromise ;
reopen()
Reopen a terminal stream for retry.
await reopen (
streamId : string
): Promise < { stream : StreamData ; created : boolean } >
Parameters:
streamId - Stream identifier
Returns:
stream - New stream data
created - Always true
Throws:
Error if stream not in terminal state
Example:
try {
await manager . persist ( stream , 'task-1' );
} catch ( error ) {
console . error ( 'Failed, reopening...' );
await manager . reopen ( 'task-1' );
await manager . persist ( retryStream , 'task-1' );
}
cleanup()
Delete a stream and all its chunks.
await cleanup ( streamId : string ): Promise < void >
Parameters:
streamId - Stream identifier
Example:
// After successful completion
await manager . cleanup ( 'stream-1' );
Polling Configuration
WatchPollingConfig
interface WatchPollingConfig {
minMs : number ; // Minimum delay between polls
maxMs : number ; // Maximum delay between polls
multiplier : number ; // Delay multiplier on empty polls
jitterRatio : number ; // Random jitter ratio (0-1)
statusCheckEvery : number ; // Check status every N polls
chunkPageSize : number ; // Chunks to fetch per poll
}
Default:
const DEFAULT_WATCH_POLLING = {
minMs: 25 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
statusCheckEvery: 3 ,
chunkPageSize: 128 ,
};
CancelPollingConfig
interface CancelPollingConfig {
minMs : number ; // Minimum delay between polls
maxMs : number ; // Maximum delay between polls
multiplier : number ; // Delay multiplier on empty polls
jitterRatio : number ; // Random jitter ratio (0-1)
}
Default:
const DEFAULT_CANCEL_POLLING = {
minMs: 50 ,
maxMs: 500 ,
multiplier: 2 ,
jitterRatio: 0.15 ,
};
Telemetry Events
StreamPollingTelemetryEvent
type StreamPollingTelemetryEvent =
| {
type : 'watch:poll' ;
streamId : string ;
fromSeq : number ;
chunkCount : number ;
statusChecked : boolean ;
}
| {
type : 'watch:empty' ;
streamId : string ;
fromSeq : number ;
delayMs : number ;
}
| {
type : 'watch:chunks' ;
streamId : string ;
delivered : number ;
lastSeq : number ;
}
| {
type : 'watch:closed' ;
streamId : string ;
reason : 'terminal' | 'missing' ;
}
| {
type : 'persist:cancel-poll' ;
streamId : string ;
delayMs : number ;
status : StreamStatus | 'missing' ;
}
| {
type : 'persist:cancel-detected' ;
streamId : string ;
latencyMs : number | null ;
};
Example:
const manager = new StreamManager ({
store ,
onPollingEvent : ( event ) => {
switch ( event . type ) {
case 'watch:poll' :
console . log (
`Polled ${ event . chunkCount } chunks from seq ${ event . fromSeq } `
);
break ;
case 'watch:empty' :
console . log ( `Empty poll, waiting ${ event . delayMs } ms` );
break ;
case 'watch:chunks' :
console . log (
`Delivered ${ event . delivered } chunks, last= ${ event . lastSeq } `
);
break ;
case 'watch:closed' :
console . log ( `Stream closed: ${ event . reason } ` );
break ;
case 'persist:cancel-detected' :
console . log ( `Cancel detected! Latency: ${ event . latencyMs } ms` );
break ;
}
},
});
Best Practices
const store = new SqliteStreamStore ( './streams.db' );
try {
const manager = new StreamManager ({ store });
// Use manager
} finally {
store . close ();
}
import { generateId } from 'ai' ;
const streamId = `task- ${ generateId () } ` ;
await manager . persist ( stream , streamId );
Cleanup Completed Streams
// After success
await manager . cleanup ( streamId );
// Or periodic cleanup
setInterval ( async () => {
const oldStreams = await findOldCompletedStreams ();
for ( const id of oldStreams ) {
await manager . cleanup ( id );
}
}, 24 * 60 * 60 * 1000 );
await manager . persist ( stream , streamId , {
onCancelDetected : async ({ streamId , latencyMs }) => {
console . log ( `Cancelled: ${ streamId } ` );
await releaseResources ();
},
});
Next Steps
Stream Persistence Guide Learn about using stream persistence
Fragment Builders Complete fragment builder API reference
Renderers API Complete renderer API reference
Messages Learn about message fragments