Overview
ADK-TS provides a powerful event-based streaming system that enables real-time agent responses. TheEvent class extends LlmResponse and serves as the fundamental unit of communication between agents and users.
Event System
TheEvent class represents a single interaction in the agent conversation:
import { Event } from '@iqai/adk';
class Event extends LlmResponse {
id: string; // Unique event identifier
invocationId: string; // Links events to invocation
author: string; // 'user' or agent name
timestamp: number; // Unix timestamp
content?: Content; // Message content
actions: EventActions; // Agent actions
longRunningToolIds?: Set<string>; // Async tool tracking
branch?: string; // Multi-agent branching
partial?: boolean; // Streaming chunk indicator
}
packages/adk/src/events/event.ts:25
Event Lifecycle
const event = new Event({
author: 'MyAgent',
invocationId: context.invocationId,
content: {
parts: [{ text: 'Hello!' }]
},
partial: false, // Complete response
});
// Check if event is final
if (event.isFinalResponse()) {
console.log('Conversation complete');
}
// Extract function calls
const calls = event.getFunctionCalls();
// Extract function responses
const responses = event.getFunctionResponses();
Streaming Modes
ADK-TS supports two streaming modes:import { StreamingMode } from '@iqai/adk';
enum StreamingMode {
SSE = 'sse', // Server-Sent Events (recommended)
NONE = 'none', // No streaming
}
Enabling Streaming
import { AgentBuilder, StreamingMode } from '@iqai/adk';
const agent = new AgentBuilder()
.withName('StreamingAgent')
.withModel('gpt-4')
.buildLlm();
// Stream responses
for await (const event of agent.runAsync(context, {
streamingMode: StreamingMode.SSE,
})) {
if (event.partial) {
process.stdout.write(event.text || '');
} else {
console.log('\nComplete event:', event);
}
}
AsyncGenerator Pattern
All agent execution methods returnAsyncGenerator<Event>:
async function* runAsync(
invocationContext: InvocationContext
): AsyncGenerator<Event> {
// Yield events as they occur
yield preprocessEvent;
yield llmResponseEvent;
yield functionCallEvent;
yield functionResponseEvent;
yield finalResponseEvent;
}
Consuming Event Streams
for await (const event of agent.runAsync(context)) {
console.log('Event from:', event.author);
console.log('Content:', event.text);
}
Event Types
User Events
const userEvent = new Event({
author: 'user',
content: {
parts: [{ text: 'What is the weather today?' }]
},
});
// Add to session
context.session.events.push(userEvent);
Agent Response Events
// Text response
const textEvent = new Event({
author: 'WeatherAgent',
content: {
parts: [{ text: 'It is sunny and 75°F today.' }]
},
});
// With metadata
textEvent.usageMetadata = {
promptTokenCount: 150,
candidatesTokenCount: 25,
totalTokenCount: 175,
};
Function Call Events
const functionCallEvent = new Event({
author: 'WeatherAgent',
content: {
parts: [
{
functionCall: {
name: 'get_weather',
args: { location: 'San Francisco' },
id: 'call_123',
}
}
]
},
});
// Extract function calls
const calls = functionCallEvent.getFunctionCalls();
console.log(calls); // [{ name: 'get_weather', args: {...}, id: 'call_123' }]
Function Response Events
const functionResponseEvent = new Event({
author: 'tool',
content: {
parts: [
{
functionResponse: {
name: 'get_weather',
id: 'call_123',
response: {
temperature: 75,
conditions: 'sunny'
},
}
}
]
},
});
Partial Events (Streaming)
When streaming is enabled, the LLM yields partial events:// Server-side
for await (const event of agent.runAsync(context, {
streamingMode: StreamingMode.SSE,
})) {
if (event.partial) {
// Stream chunk to client
res.write(`data: ${JSON.stringify(event)}\n\n`);
} else {
// Complete event
res.write(`data: ${JSON.stringify(event)}\n\n`);
if (event.isFinalResponse()) {
res.end();
}
}
}
Client-Side Streaming
// Browser/Node.js client
const eventSource = new EventSource('/api/agent/stream');
let currentText = '';
eventSource.onmessage = (message) => {
const event: Event = JSON.parse(message.data);
if (event.partial) {
// Append partial text
currentText += event.text || '';
updateUI(currentText);
} else {
// Complete event received
if (event.isFinalResponse()) {
console.log('Stream complete');
eventSource.close();
}
}
};
Event Actions
import { EventActions } from '@iqai/adk';
class EventActions {
transferToAgent?: string; // Agent transfer target
skipSummarization?: boolean; // Skip event summarization
// Additional action metadata
}
const event = new Event({
author: 'RoutingAgent',
content: { parts: [{ text: 'Transferring to specialist...' }] },
actions: new EventActions(),
});
event.actions.transferToAgent = 'SpecialistAgent';
Long-Running Tools
Track asynchronous tool execution:const event = new Event({
author: 'DataAgent',
content: {
parts: [
{
functionCall: {
name: 'process_large_dataset',
args: { dataset: 'users.csv' },
id: 'call_456',
}
}
]
},
longRunningToolIds: new Set(['call_456']),
});
if (event.longRunningToolIds?.has('call_456')) {
console.log('Tool is still running...');
}
Multi-Agent Branching
Thebranch field tracks agent hierarchy:
// Parent agent event
const parentEvent = new Event({
author: 'MainAgent',
branch: 'MainAgent',
content: { parts: [{ text: 'Starting analysis...' }] },
});
// Child agent event
const childEvent = new Event({
author: 'AnalysisAgent',
branch: 'MainAgent.AnalysisAgent',
content: { parts: [{ text: 'Analyzing data...' }] },
});
// Filter events by branch
const mainAgentEvents = session.events.filter(
e => e.branch === 'MainAgent'
);
Event Manipulation
Creating Events
import { Event } from '@iqai/adk';
const event = new Event({
author: 'MyAgent',
invocationId: 'inv_123',
content: {
parts: [
{ text: 'Processing your request...' },
]
},
timestamp: Math.floor(Date.now() / 1000),
id: Event.newId(), // Generate unique ID
});
Modifying Events
// Add usage metadata
event.usageMetadata = {
promptTokenCount: 100,
candidatesTokenCount: 50,
totalTokenCount: 150,
};
// Add cache metadata
event.cacheMetadata = {
cacheName: 'context_cache_1',
fingerprint: 'abc123',
contentsCount: 5,
};
// Mark as partial
event.partial = true;
Event Queries
// Check if final
if (event.isFinalResponse()) {
console.log('This is the last event');
}
// Extract function calls
const functionCalls = event.getFunctionCalls();
if (functionCalls.length > 0) {
console.log('Agent wants to call:', functionCalls.map(fc => fc.name));
}
// Extract function responses
const functionResponses = event.getFunctionResponses();
// Check for code execution results
if (event.hasTrailingCodeExecutionResult()) {
console.log('Event contains code execution output');
}
Express.js Integration
import express from 'express';
import { AgentBuilder, StreamingMode } from '@iqai/adk';
const app = express();
app.get('/api/agent/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const agent = new AgentBuilder()
.withModel('gpt-4')
.buildLlm();
try {
for await (const event of agent.runAsync(context, {
streamingMode: StreamingMode.SSE,
})) {
// Send event to client
res.write(`data: ${JSON.stringify({
id: event.id,
author: event.author,
text: event.text,
partial: event.partial,
timestamp: event.timestamp,
})}\n\n`);
if (event.isFinalResponse()) {
res.write('data: [DONE]\n\n');
res.end();
break;
}
}
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
Next.js API Route
// app/api/agent/stream/route.ts
import { AgentBuilder, StreamingMode } from '@iqai/adk';
export async function POST(request: Request) {
const { message } = await request.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const agent = new AgentBuilder()
.withModel('gpt-4')
.buildLlm();
try {
for await (const event of agent.runAsync(context, {
streamingMode: StreamingMode.SSE,
})) {
const data = `data: ${JSON.stringify(event)}\n\n`;
controller.enqueue(encoder.encode(data));
if (event.isFinalResponse()) {
controller.close();
break;
}
}
} catch (error) {
controller.error(error);
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
WebSocket Alternative
import { WebSocketServer } from 'ws';
import { AgentBuilder } from '@iqai/adk';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', async (data) => {
const { message } = JSON.parse(data.toString());
const agent = new AgentBuilder()
.withModel('gpt-4')
.buildLlm();
for await (const event of agent.runAsync(context)) {
ws.send(JSON.stringify(event));
if (event.isFinalResponse()) {
break;
}
}
});
});
Event Storage
// Store events in session
for await (const event of agent.runAsync(context)) {
// Automatically stored in session
context.session.events.push(event);
}
// Retrieve conversation history
const history = context.session.events;
// Filter by author
const userMessages = history.filter(e => e.author === 'user');
const agentResponses = history.filter(e => e.author === 'MyAgent');
// Get latest event
const latestEvent = history[history.length - 1];
Performance Tips
- Buffer Partial Events: Accumulate small chunks before sending
- Compress Streams: Use gzip for large event payloads
- Event Batching: Group non-partial events when possible
- Connection Management: Handle client disconnections gracefully
- Memory Limits: Clear old events from sessions periodically
// Buffer small chunks
let buffer = '';
const MIN_CHUNK_SIZE = 50;
for await (const event of agent.runAsync(context, {
streamingMode: StreamingMode.SSE,
})) {
if (event.partial) {
buffer += event.text || '';
if (buffer.length >= MIN_CHUNK_SIZE) {
sendToClient(buffer);
buffer = '';
}
} else {
if (buffer) {
sendToClient(buffer);
buffer = '';
}
sendToClient(event.text || '');
}
}
Best Practices
Streaming Guidelines:
- Always handle partial events separately from complete events
- Check
event.isFinalResponse()to know when conversation ends - Implement reconnection logic for dropped connections
- Set appropriate timeouts for long-running operations
- Clean up resources when stream completes or errors
Use
StreamingMode.SSE for production applications. The NONE mode is suitable for batch processing or testing.Debugging Events
import { Logger } from '@iqai/adk';
const logger = new Logger({ name: 'EventDebugger' });
for await (const event of agent.runAsync(context)) {
logger.debug('Event received', {
id: event.id,
author: event.author,
partial: event.partial,
contentLength: event.text?.length || 0,
hasFunctionCalls: event.getFunctionCalls().length > 0,
isFinal: event.isFinalResponse(),
});
}
Next Steps
Authentication
Secure tool access with authentication patterns
Telemetry
Monitor agents with OpenTelemetry integration