Overview
The Pi AI toolkit provides four main functions for generating assistant messages:
stream() - Stream assistant messages with full event control
complete() - Get complete assistant message without streaming
streamSimple() - Stream with simplified reasoning options
completeSimple() - Complete with simplified reasoning options
stream()
Stream an assistant message with granular event handling.
function stream < TApi extends Api >(
model : Model < TApi >,
context : Context ,
options ?: ProviderStreamOptions
) : AssistantMessageEventStream
The model to use for generation. Get models via getModel(provider, modelId).
The conversation context including system prompt, messages, and tools. interface Context {
systemPrompt ?: string ;
messages : Message [];
tools ?: Tool [];
}
Optional provider-specific streaming options. Show StreamOptions properties
Controls randomness (0.0 to 2.0). Lower is more deterministic.
Maximum tokens to generate.
Abort signal to cancel the request.
API key for the provider. Falls back to environment variables.
transport
'sse' | 'websocket' | 'auto'
Preferred transport for providers that support multiple transports.
cacheRetention
'none' | 'short' | 'long'
default: "short"
Prompt cache retention preference. Providers map this to their supported values.
Session identifier for providers that support session-based caching.
onPayload
(payload: unknown) => void
Callback for inspecting provider payloads before sending.
Custom HTTP headers to include in API requests.
Maximum delay in milliseconds to wait for a retry when the server requests a long wait.
Optional metadata to include in API requests. Providers extract the fields they understand.
AssistantMessageEventStream
AsyncIterable<AssistantMessageEvent>
An async iterable stream that emits events as the assistant message is generated.
Call .result() to get the final AssistantMessage after streaming completes.
Example
import { getModel , stream } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-4o-mini' );
const s = stream ( model , {
systemPrompt: 'You are a helpful assistant.' ,
messages: [{ role: 'user' , content: 'Hello!' }]
});
for await ( const event of s ) {
switch ( event . type ) {
case 'start' :
console . log ( `Starting with ${ event . partial . model } ` );
break ;
case 'text_delta' :
process . stdout . write ( event . delta );
break ;
case 'thinking_delta' :
console . log ( '[Thinking]' , event . delta );
break ;
case 'toolcall_end' :
console . log ( 'Tool:' , event . toolCall . name , event . toolCall . arguments );
break ;
case 'done' :
console . log ( ' \n Finished:' , event . reason );
break ;
case 'error' :
console . error ( 'Error:' , event . error . errorMessage );
break ;
}
}
// Get final message
const message = await s . result ();
console . log ( 'Tokens:' , message . usage . totalTokens );
console . log ( 'Cost: $' , message . usage . cost . total );
complete()
Get a complete assistant message without streaming.
async function complete < TApi extends Api >(
model : Model < TApi >,
context : Context ,
options ?: ProviderStreamOptions
) : Promise < AssistantMessage >
The model to use for generation.
The conversation context.
Same options as stream().
AssistantMessage
Promise<AssistantMessage>
The complete assistant message. interface AssistantMessage {
role : "assistant" ;
content : ( TextContent | ThinkingContent | ToolCall )[];
api : Api ;
provider : Provider ;
model : string ;
usage : Usage ;
stopReason : StopReason ;
errorMessage ?: string ;
timestamp : number ;
}
Example
import { getModel , complete } from '@mariozechner/pi-ai' ;
const model = getModel ( 'anthropic' , 'claude-3-5-haiku-20241022' );
const response = await complete ( model , {
messages: [{ role: 'user' , content: 'Explain TypeScript in one sentence.' }]
});
for ( const block of response . content ) {
if ( block . type === 'text' ) {
console . log ( block . text );
}
}
console . log ( `Cost: $ ${ response . usage . cost . total . toFixed ( 4 ) } ` );
streamSimple()
Stream with simplified reasoning/thinking options. Maps unified reasoning levels to provider-specific parameters.
function streamSimple < TApi extends Api >(
model : Model < TApi >,
context : Context ,
options ?: SimpleStreamOptions
) : AssistantMessageEventStream
Extends StreamOptions with reasoning support. Show SimpleStreamOptions properties
reasoning
'minimal' | 'low' | 'medium' | 'high' | 'xhigh'
Unified thinking level. Automatically maps to provider-specific parameters:
OpenAI: reasoning_effort
Anthropic: thinking_enabled + thinking_budget_tokens
Google: thinking.enabled + thinking.budgetTokens
Custom token budgets for thinking levels (token-based providers only). interface ThinkingBudgets {
minimal ?: number ;
low ?: number ;
medium ?: number ;
high ?: number ;
}
Example
import { getModel , streamSimple } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-5-mini' );
const s = streamSimple ( model , {
messages: [{ role: 'user' , content: 'Solve: 2x + 5 = 13' }]
}, {
reasoning: 'medium' // Maps to appropriate provider parameter
});
for await ( const event of s ) {
if ( event . type === 'thinking_delta' ) {
console . log ( '[Thinking]' , event . delta );
} else if ( event . type === 'text_delta' ) {
process . stdout . write ( event . delta );
}
}
completeSimple()
Get complete response with simplified reasoning options.
async function completeSimple < TApi extends Api >(
model : Model < TApi >,
context : Context ,
options ?: SimpleStreamOptions
) : Promise < AssistantMessage >
Parameters and return type are the same as streamSimple() and complete().
Example
import { getModel , completeSimple } from '@mariozechner/pi-ai' ;
const model = getModel ( 'anthropic' , 'claude-sonnet-4-20250514' );
const response = await completeSimple ( model , {
messages: [{ role: 'user' , content: 'Calculate 25 * 18' }]
}, {
reasoning: 'high'
});
for ( const block of response . content ) {
if ( block . type === 'thinking' ) {
console . log ( 'Thinking:' , block . thinking );
} else if ( block . type === 'text' ) {
console . log ( 'Answer:' , block . text );
}
}
Context
The Context interface represents a conversation’s state.
interface Context {
systemPrompt ?: string ;
messages : Message [];
tools ?: Tool [];
}
System-level instructions for the assistant.
Conversation history. Can include UserMessage, AssistantMessage, and ToolResultMessage. type Message = UserMessage | AssistantMessage | ToolResultMessage ;
interface UserMessage {
role : "user" ;
content : string | ( TextContent | ImageContent )[];
timestamp : number ;
}
Context Serialization
Context objects are fully JSON-serializable:
import { Context } from '@mariozechner/pi-ai' ;
const context : Context = {
systemPrompt: 'You are helpful.' ,
messages: [{ role: 'user' , content: 'Hello' , timestamp: Date . now () }]
};
// Serialize
const json = JSON . stringify ( context );
localStorage . setItem ( 'conversation' , json );
// Deserialize
const restored : Context = JSON . parse ( localStorage . getItem ( 'conversation' ) ! );
Events
The AssistantMessageEventStream emits these event types:
start
{ type: 'start'; partial: AssistantMessage }
Stream begins. Contains initial message structure.
text_start
{ type: 'text_start'; contentIndex: number; partial: AssistantMessage }
Text block starts at the given content index.
text_delta
{ type: 'text_delta'; contentIndex: number; delta: string; partial: AssistantMessage }
Text chunk received. delta contains the new text.
text_end
{ type: 'text_end'; contentIndex: number; content: string; partial: AssistantMessage }
Text block complete. content contains the full text.
thinking_start
{ type: 'thinking_start'; contentIndex: number; partial: AssistantMessage }
Thinking block starts (for models with reasoning capabilities).
thinking_delta
{ type: 'thinking_delta'; contentIndex: number; delta: string; partial: AssistantMessage }
Thinking chunk received.
thinking_end
{ type: 'thinking_end'; contentIndex: number; content: string; partial: AssistantMessage }
Thinking block complete.
toolcall_start
{ type: 'toolcall_start'; contentIndex: number; partial: AssistantMessage }
Tool call begins.
toolcall_delta
{ type: 'toolcall_delta'; contentIndex: number; delta: string; partial: AssistantMessage }
Tool arguments streaming. partial.content[contentIndex].arguments contains partially parsed JSON. Arguments may be incomplete during toolcall_delta. Always check for field existence.
toolcall_end
{ type: 'toolcall_end'; contentIndex: number; toolCall: ToolCall; partial: AssistantMessage }
Tool call complete. toolCall contains the full parsed tool call. interface ToolCall {
type : "toolCall" ;
id : string ;
name : string ;
arguments : Record < string , any >;
thoughtSignature ?: string ; // Google-specific
}
done
{ type: 'done'; reason: StopReason; message: AssistantMessage }
Stream complete successfully. reason is "stop", "length", or "toolUse".
error
{ type: 'error'; reason: 'error' | 'aborted'; error: AssistantMessage }
Error occurred. error contains partial message and error details.
Stop Reasons
Every AssistantMessage has a stopReason field:
type StopReason = "stop" | "length" | "toolUse" | "error" | "aborted" ;
Normal completion - the model finished its response.
Output hit the maximum token limit.
Model is calling tools and expects tool results.
An error occurred during generation. Check errorMessage field.
Request was cancelled via AbortSignal.
Aborting Requests
Use AbortSignal to cancel in-progress requests:
import { getModel , stream } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-4o-mini' );
const controller = new AbortController ();
// Abort after 2 seconds
setTimeout (() => controller . abort (), 2000 );
const s = stream ( model , {
messages: [{ role: 'user' , content: 'Write a long story' }]
}, {
signal: controller . signal
});
for await ( const event of s ) {
if ( event . type === 'text_delta' ) {
process . stdout . write ( event . delta );
} else if ( event . type === 'error' && event . reason === 'aborted' ) {
console . log ( ' \n Request aborted' );
}
}
const response = await s . result ();
if ( response . stopReason === 'aborted' ) {
console . log ( 'Partial content:' , response . content );
console . log ( 'Tokens used:' , response . usage . totalTokens );
}
Aborted messages can be added to context and continued:
const context = { messages: [] };
// First request gets aborted
const partial = await complete ( model , context , { signal: abortSignal });
context . messages . push ( partial );
// Continue the conversation
context . messages . push ({ role: 'user' , content: 'Please continue' });
const continuation = await complete ( model , context );