Skip to main content

Overview

The Agent class provides a high-level interface for creating AI agents that can:
  • Stream responses from LLM providers
  • Execute tools and handle multi-turn conversations
  • Support steering and follow-up messages during execution
  • Manage conversation state and event subscriptions

Constructor

import { Agent } from '@mariozechner/pi-agent-core';

const agent = new Agent(options);

AgentOptions

initialState
Partial<AgentState>
Initial state for the agent including system prompt, model, thinking level, tools, and messages.
convertToLlm
(messages: AgentMessage[]) => Message[] | Promise<Message[]>
Converts AgentMessage[] to LLM-compatible Message[] before each LLM call. Default filters to user/assistant/toolResult messages.
convertToLlm: (messages) => messages.filter(
  m => m.role === 'user' || m.role === 'assistant' || m.role === 'toolResult'
)
transformContext
(messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>
Optional transform applied to context before convertToLlm. Use for context pruning, injecting external context, etc.
transformContext: async (messages) => {
  if (estimateTokens(messages) > MAX_TOKENS) {
    return pruneOldMessages(messages);
  }
  return messages;
}
steeringMode
'all' | 'one-at-a-time'
default:"one-at-a-time"
Controls how steering messages are delivered:
  • all: Send all steering messages at once
  • one-at-a-time: Send one steering message per turn
followUpMode
'all' | 'one-at-a-time'
default:"one-at-a-time"
Controls how follow-up messages are delivered:
  • all: Send all follow-up messages at once
  • one-at-a-time: Send one follow-up message per turn
streamFn
StreamFn
Custom stream function for proxy backends or custom LLM routing. Default uses streamSimple from @mariozechner/pi-ai.
sessionId
string
Optional session identifier forwarded to LLM providers. Used by providers that support session-based caching (e.g., OpenAI Codex).
getApiKey
(provider: string) => Promise<string | undefined> | string | undefined
Resolves an API key dynamically for each LLM call. Useful for expiring tokens (e.g., GitHub Copilot OAuth).
thinkingBudgets
ThinkingBudgets
Custom token budgets for thinking levels (token-based providers only).
transport
Transport
default:"sse"
Preferred transport for providers that support multiple transports ('sse' or 'responses').
maxRetryDelayMs
number
default:"60000"
Maximum delay in milliseconds to wait for a retry when the server requests a long wait. If the server’s requested delay exceeds this value, the request fails immediately. Set to 0 to disable the cap.

Properties

state

get state(): AgentState
Returns the current agent state containing:
systemPrompt
string
The system prompt used for LLM calls
model
Model<any>
The LLM model to use for generation
thinkingLevel
ThinkingLevel
Thinking/reasoning level: 'off' | 'minimal' | 'low' | 'medium' | 'high' | 'xhigh'
tools
AgentTool<any>[]
Available tools the agent can execute
messages
AgentMessage[]
Conversation history including user, assistant, and tool result messages
isStreaming
boolean
Whether the agent is currently streaming a response
streamMessage
AgentMessage | null
The partial message being streamed (null when not streaming)
pendingToolCalls
Set<string>
Set of tool call IDs currently being executed
error
string | undefined
Error message from the last failed operation

sessionId

get sessionId(): string | undefined
set sessionId(value: string | undefined)
Get or set the session ID used for provider caching. Call this when switching sessions (new session, branch, resume).

thinkingBudgets

get thinkingBudgets(): ThinkingBudgets | undefined
set thinkingBudgets(value: ThinkingBudgets | undefined)
Get or set custom thinking budgets for token-based providers.

transport

get transport(): Transport
Get the current preferred transport ('sse' or 'responses').

maxRetryDelayMs

get maxRetryDelayMs(): number | undefined
set maxRetryDelayMs(value: number | undefined)
Get or set the maximum delay to wait for server-requested retries. Set to 0 to disable the cap.

Methods

subscribe

subscribe(fn: (e: AgentEvent) => void): () => void
Subscribe to agent events. Returns an unsubscribe function.
const unsubscribe = agent.subscribe((event) => {
  switch (event.type) {
    case 'message_start':
      console.log('Message started:', event.message);
      break;
    case 'message_update':
      console.log('Message updated:', event.message);
      break;
    case 'tool_execution_start':
      console.log('Tool execution started:', event.toolName);
      break;
  }
});

// Later, unsubscribe
unsubscribe();

prompt

prompt(message: AgentMessage | AgentMessage[]): Promise<void>
prompt(input: string, images?: ImageContent[]): Promise<void>
Send a prompt to the agent and stream the response. Supports text, images, or custom AgentMessage objects.
// Text prompt
await agent.prompt('What is the capital of France?');

// Text with images
await agent.prompt('What is in this image?', [
  { type: 'image', source: { type: 'base64', media_type: 'image/png', data: '...' } }
]);

// Custom message
await agent.prompt({
  role: 'user',
  content: [{ type: 'text', text: 'Hello!' }],
  timestamp: Date.now()
});

// Multiple messages
await agent.prompt([
  { role: 'user', content: 'First message', timestamp: Date.now() },
  { role: 'user', content: 'Second message', timestamp: Date.now() }
]);
Throws an error if the agent is already streaming. Use steer() or followUp() to queue messages during execution.

continue

continue(): Promise<void>
Continue from current context (used for retries and resuming queued messages). The last message in context must be a user or toolResult message.
// After an error, retry the last request
await agent.continue();

steer

steer(m: AgentMessage): void
Queue a steering message to interrupt the agent mid-run. Delivered after current tool execution, skips remaining tools.
// While agent is running
agent.steer({
  role: 'user',
  content: 'Stop and focus on security instead',
  timestamp: Date.now()
});

followUp

followUp(m: AgentMessage): void
Queue a follow-up message to be processed after the agent finishes. Delivered only when agent has no more tool calls or steering messages.
// Queue a follow-up question
agent.followUp({
  role: 'user',
  content: 'Can you explain that in more detail?',
  timestamp: Date.now()
});

abort

abort(): void
Abort the current streaming operation.
agent.abort();

waitForIdle

waitForIdle(): Promise<void>
Wait for the agent to finish the current operation.
await agent.waitForIdle();
console.log('Agent is idle');

reset

reset(): void
Reset the agent to initial state. Clears messages, streaming state, pending tool calls, errors, and message queues.
agent.reset();

State Mutators

setSystemPrompt(v: string): void
setModel(m: Model<any>): void
setThinkingLevel(l: ThinkingLevel): void
setTools(t: AgentTool<any>[]): void
setSteeringMode(mode: 'all' | 'one-at-a-time'): void
setFollowUpMode(mode: 'all' | 'one-at-a-time'): void
setTransport(value: Transport): void
replaceMessages(ms: AgentMessage[]): void
appendMessage(m: AgentMessage): void
clearMessages(): void
clearSteeringQueue(): void
clearFollowUpQueue(): void
clearAllQueues(): void
Update agent configuration and state.
agent.setSystemPrompt('You are a helpful coding assistant.');
agent.setModel(getModel('openai', 'gpt-4o'));
agent.setThinkingLevel('medium');
agent.setTools([myTool]);

hasQueuedMessages

hasQueuedMessages(): boolean
Check if there are any steering or follow-up messages in the queue.
if (agent.hasQueuedMessages()) {
  console.log('Messages are queued');
}

Complete Example

import { Agent } from '@mariozechner/pi-agent-core';
import { getModel } from '@mariozechner/pi-ai';
import { Type } from '@sinclair/typebox';

// Define a tool
const weatherTool = {
  label: 'Weather',
  name: 'get_weather',
  description: 'Get current weather for a location',
  parameters: Type.Object({
    location: Type.String({ description: 'City name' }),
  }),
  execute: async (toolCallId, args) => {
    // Simulate API call
    return {
      content: [{ 
        type: 'text', 
        text: `Weather in ${args.location}: Sunny, 72°F` 
      }],
      details: { temperature: 72, condition: 'sunny' }
    };
  }
};

// Create agent
const agent = new Agent({
  initialState: {
    systemPrompt: 'You are a helpful weather assistant.',
    model: getModel('openai', 'gpt-4o'),
    thinkingLevel: 'low',
    tools: [weatherTool]
  }
});

// Subscribe to events
agent.subscribe((event) => {
  if (event.type === 'message_update') {
    console.log('Assistant:', event.message.content);
  }
  if (event.type === 'tool_execution_start') {
    console.log(`Calling ${event.toolName}...`);
  }
});

// Send prompt
await agent.prompt('What is the weather in San Francisco?');

// Access results
const lastMessage = agent.state.messages[agent.state.messages.length - 1];
console.log('Final response:', lastMessage);

Advanced: Proxy Stream Function

For applications that need to route LLM calls through a backend server:
import { streamProxy } from '@mariozechner/pi-agent-core';

const agent = new Agent({
  streamFn: (model, context, options) =>
    streamProxy(model, context, {
      ...options,
      authToken: await getAuthToken(),
      proxyUrl: 'https://api.example.com',
    }),
});
See the Proxy Stream Function documentation for details.

Build docs developers (and LLMs) love