Overview
The@mariozechner/pi-agent-core package provides a high-level agent abstraction built on top of @mariozechner/pi-ai. It manages conversation state, executes tools automatically, handles streaming, and supports custom message types.
Stateful Agent
Manages conversation context, model, tools, and settings
Automatic Tool Execution
Validates and executes tool calls in a loop until completion
Event Streaming
Fine-grained events for UI updates (text deltas, tool execution, etc.)
Extensible Messages
Support custom message types via TypeScript declaration merging
Installation
npm install @mariozechner/pi-agent-core
Quick Start
import { Agent } from "@mariozechner/pi-agent-core";
import { getModel } from "@mariozechner/pi-ai";
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful coding assistant.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
thinkingLevel: "medium",
tools: [],
messages: []
}
});
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
});
await agent.prompt("List the files in the current directory");
Key Features
Automatic Tool Loop
The agent automatically executes tool calls until the LLM stops requesting tools:import { Agent, type AgentTool } from "@mariozechner/pi-agent-core";
import { Type } from "@sinclair/typebox";
import { readFileSync } from "fs";
const readFileTool: AgentTool = {
name: "read_file",
label: "Read File",
description: "Read a file's contents",
parameters: Type.Object({
path: Type.String({ description: "File path" })
}),
execute: async (toolCallId, params, signal, onUpdate) => {
const content = await fs.readFile(params.path, "utf-8");
return {
content: [{ type: "text", text: content }],
details: { size: content.length }
};
}
};
const agent = new Agent({
initialState: {
model: getModel("openai", "gpt-4o"),
tools: [readFileTool]
}
});
// Agent will call read_file tool, get result, and respond
await agent.prompt("What's in config.json?");
Event System
The agent emits granular events for building responsive UIs:agent.subscribe((event) => {
switch (event.type) {
case "agent_start":
console.log("Agent starting...");
break;
case "turn_start":
console.log("New turn (LLM call)");
break;
case "message_start":
console.log(`Message started: ${event.message.role}`);
break;
case "message_update":
// Only for assistant messages
const e = event.assistantMessageEvent;
if (e.type === "text_delta") {
process.stdout.write(e.delta);
} else if (e.type === "thinking_delta") {
// Handle thinking/reasoning
}
break;
case "message_end":
console.log(`Message complete: ${event.message.role}`);
break;
case "tool_execution_start":
console.log(`Executing: ${event.toolName}`);
break;
case "tool_execution_update":
// Tool streaming progress (if tool supports it)
break;
case "tool_execution_end":
console.log(`Tool completed: ${event.toolName}`);
break;
case "turn_end":
console.log("Turn complete", event.message, event.toolResults);
break;
case "agent_end":
console.log("Agent finished", event.messages);
break;
}
});
State Management
The agent maintains conversation state with convenient setters:import { Agent } from "@mariozechner/pi-agent-core";
const agent = new Agent({ initialState });
// Access state
console.log(agent.state.messages);
console.log(agent.state.model);
console.log(agent.state.isStreaming);
// Update state
agent.setSystemPrompt("You are a senior developer.");
agent.setModel(getModel("openai", "gpt-4o"));
agent.setThinkingLevel("high");
agent.setTools([tool1, tool2]);
// Manage messages
agent.appendMessage({ role: "user", content: "Hello", timestamp: Date.now() });
agent.replaceMessages(newMessages);
agent.clearMessages();
// Reset everything
agent.reset();
Custom Message Types
ExtendAgentMessage to support app-specific message types:
// Define custom message types via declaration merging
declare module "@mariozechner/pi-agent-core" {
interface CustomAgentMessages {
notification: {
role: "notification";
text: string;
level: "info" | "warning" | "error";
timestamp: number;
};
separator: {
role: "separator";
timestamp: number;
};
}
}
// Now these are valid AgentMessage types
const msg1: AgentMessage = {
role: "notification",
text: "Build completed",
level: "info",
timestamp: Date.now()
};
const msg2: AgentMessage = {
role: "separator",
timestamp: Date.now()
};
// Filter them out before LLM sees them
const agent = new Agent({
convertToLlm: (messages) => messages.flatMap(m => {
if (m.role === "notification" || m.role === "separator") {
return []; // Filter out UI-only messages
}
return [m];
})
});
Steering and Follow-up
Interrupt or extend agent execution with queued messages:const agent = new Agent({ initialState });
// Start long-running task
agent.prompt("Analyze all files in this project");
// While tools are running, interrupt
agent.steer({
role: "user",
content: "Stop! Only analyze TypeScript files.",
timestamp: Date.now()
});
// Queue work to happen after completion
agent.followUp({
role: "user",
content: "Now create a summary report.",
timestamp: Date.now()
});
// Configure queuing behavior
agent.setSteeringMode("one-at-a-time"); // or "all"
agent.setFollowUpMode("one-at-a-time"); // or "all"
// Clear queues
agent.clearSteeringQueue();
agent.clearFollowUpQueue();
agent.clearAllQueues();
Context Transformation
Transform conversation context before each LLM call:const agent = new Agent({
transformContext: async (messages, signal) => {
// Remove old messages to stay within token limits
if (messages.length > 50) {
return messages.slice(-30);
}
// Inject dynamic context
const systemMessage = {
role: "user" as const,
content: `Current time: ${new Date().toISOString()}`,
timestamp: Date.now()
};
return [systemMessage, ...messages];
}
});
Tool Streaming
Tools can stream progress updates:const analysisTool: AgentTool = {
name: "analyze",
description: "Analyze a large file",
parameters: Type.Object({
path: Type.String()
}),
execute: async (toolCallId, params, signal, onUpdate) => {
const lines = await readFileLines(params.path);
for (let i = 0; i < lines.length; i++) {
// Check for cancellation
if (signal.aborted) break;
// Stream progress
onUpdate?.({
content: [{ type: "text", text: `Processing line ${i + 1}/${lines.length}` }],
details: { progress: (i + 1) / lines.length }
});
await processLine(lines[i]);
}
return {
content: [{ type: "text", text: "Analysis complete" }],
details: { totalLines: lines.length }
};
}
};
Proxy Mode (Browser/Backend)
For browser apps that need to proxy LLM calls through a backend:import { Agent, streamProxy } from "@mariozechner/pi-agent-core";
const agent = new Agent({
streamFn: (model, context, options) =>
streamProxy(model, context, {
...options,
authToken: "user-session-token",
proxyUrl: "https://api.example.com/llm"
})
});
API Reference
Agent Class
class Agent {
constructor(config: AgentConfig);
// State
get state(): AgentState;
// Prompting
prompt(message: string | ContentBlock[] | AgentMessage): Promise<void>;
continue(): Promise<void>;
// State updates
setSystemPrompt(prompt: string): void;
setModel(model: Model<any>): void;
setThinkingLevel(level: ThinkingLevel): void;
setTools(tools: AgentTool<any>[]): void;
appendMessage(message: AgentMessage): void;
replaceMessages(messages: AgentMessage[]): void;
clearMessages(): void;
reset(): void;
// Session and budgets
sessionId: string | undefined;
thinkingBudgets: ThinkingBudgets;
// Control
abort(): void;
waitForIdle(): Promise<void>;
// Events
subscribe(listener: (event: AgentEvent) => void): () => void;
// Steering and follow-up
steer(message: AgentMessage): void;
followUp(message: AgentMessage): void;
setSteeringMode(mode: "one-at-a-time" | "all"): void;
setFollowUpMode(mode: "one-at-a-time" | "all"): void;
getSteeringMode(): "one-at-a-time" | "all";
getFollowUpMode(): "one-at-a-time" | "all";
clearSteeringQueue(): void;
clearFollowUpQueue(): void;
clearAllQueues(): void;
}
Low-Level API
For direct control without the Agent class:import { agentLoop, agentLoopContinue } from "@mariozechner/pi-agent-core";
const context: AgentContext = {
systemPrompt: "You are helpful.",
messages: [],
tools: []
};
const config: AgentLoopConfig = {
model: getModel("openai", "gpt-4o"),
convertToLlm: (msgs) => msgs.filter(m => ["user", "assistant", "toolResult"].includes(m.role))
};
const userMessage = { role: "user", content: "Hello", timestamp: Date.now() };
for await (const event of agentLoop([userMessage], context, config)) {
console.log(event.type);
}
Example: Chat Bot
import { Agent } from "@mariozechner/pi-agent-core";
import { getModel, Type } from "@mariozechner/pi-ai";
const tools = [{
name: "get_time",
description: "Get current time",
parameters: Type.Object({}),
execute: async () => ({
content: [{ type: "text", text: new Date().toISOString() }]
})
}];
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful assistant.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
tools
}
});
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
} else if (event.type === "agent_end") {
console.log("\n[Done]");
}
});
// Conversation
await agent.prompt("What time is it?");
await agent.prompt("What's 2 + 2?");