Skip to main content

Overview

ADK-TS provides a powerful event-based streaming system that enables real-time agent responses. The Event class extends LlmResponse and serves as the fundamental unit of communication between agents and users.

Event System

The Event class represents a single interaction in the agent conversation:
import { Event } from '@iqai/adk';

class Event extends LlmResponse {
  id: string;                    // Unique event identifier
  invocationId: string;          // Links events to invocation
  author: string;                // 'user' or agent name
  timestamp: number;             // Unix timestamp
  content?: Content;             // Message content
  actions: EventActions;         // Agent actions
  longRunningToolIds?: Set<string>; // Async tool tracking
  branch?: string;               // Multi-agent branching
  partial?: boolean;             // Streaming chunk indicator
}
Source: packages/adk/src/events/event.ts:25

Event Lifecycle

const event = new Event({
  author: 'MyAgent',
  invocationId: context.invocationId,
  content: {
    parts: [{ text: 'Hello!' }]
  },
  partial: false, // Complete response
});

// Check if event is final
if (event.isFinalResponse()) {
  console.log('Conversation complete');
}

// Extract function calls
const calls = event.getFunctionCalls();

// Extract function responses
const responses = event.getFunctionResponses();

Streaming Modes

ADK-TS supports two streaming modes:
import { StreamingMode } from '@iqai/adk';

enum StreamingMode {
  SSE = 'sse',        // Server-Sent Events (recommended)
  NONE = 'none',      // No streaming
}

Enabling Streaming

import { AgentBuilder, StreamingMode } from '@iqai/adk';

const agent = new AgentBuilder()
  .withName('StreamingAgent')
  .withModel('gpt-4')
  .buildLlm();

// Stream responses
for await (const event of agent.runAsync(context, {
  streamingMode: StreamingMode.SSE,
})) {
  if (event.partial) {
    process.stdout.write(event.text || '');
  } else {
    console.log('\nComplete event:', event);
  }
}

AsyncGenerator Pattern

All agent execution methods return AsyncGenerator<Event>:
async function* runAsync(
  invocationContext: InvocationContext
): AsyncGenerator<Event> {
  // Yield events as they occur
  yield preprocessEvent;
  yield llmResponseEvent;
  yield functionCallEvent;
  yield functionResponseEvent;
  yield finalResponseEvent;
}

Consuming Event Streams

for await (const event of agent.runAsync(context)) {
  console.log('Event from:', event.author);
  console.log('Content:', event.text);
}

Event Types

User Events

const userEvent = new Event({
  author: 'user',
  content: {
    parts: [{ text: 'What is the weather today?' }]
  },
});

// Add to session
context.session.events.push(userEvent);

Agent Response Events

// Text response
const textEvent = new Event({
  author: 'WeatherAgent',
  content: {
    parts: [{ text: 'It is sunny and 75°F today.' }]
  },
});

// With metadata
textEvent.usageMetadata = {
  promptTokenCount: 150,
  candidatesTokenCount: 25,
  totalTokenCount: 175,
};

Function Call Events

const functionCallEvent = new Event({
  author: 'WeatherAgent',
  content: {
    parts: [
      {
        functionCall: {
          name: 'get_weather',
          args: { location: 'San Francisco' },
          id: 'call_123',
        }
      }
    ]
  },
});

// Extract function calls
const calls = functionCallEvent.getFunctionCalls();
console.log(calls); // [{ name: 'get_weather', args: {...}, id: 'call_123' }]

Function Response Events

const functionResponseEvent = new Event({
  author: 'tool',
  content: {
    parts: [
      {
        functionResponse: {
          name: 'get_weather',
          id: 'call_123',
          response: { 
            temperature: 75, 
            conditions: 'sunny' 
          },
        }
      }
    ]
  },
});

Partial Events (Streaming)

When streaming is enabled, the LLM yields partial events:
// Server-side
for await (const event of agent.runAsync(context, {
  streamingMode: StreamingMode.SSE,
})) {
  if (event.partial) {
    // Stream chunk to client
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  } else {
    // Complete event
    res.write(`data: ${JSON.stringify(event)}\n\n`);
    
    if (event.isFinalResponse()) {
      res.end();
    }
  }
}

Client-Side Streaming

// Browser/Node.js client
const eventSource = new EventSource('/api/agent/stream');

let currentText = '';

eventSource.onmessage = (message) => {
  const event: Event = JSON.parse(message.data);
  
  if (event.partial) {
    // Append partial text
    currentText += event.text || '';
    updateUI(currentText);
  } else {
    // Complete event received
    if (event.isFinalResponse()) {
      console.log('Stream complete');
      eventSource.close();
    }
  }
};

Event Actions

import { EventActions } from '@iqai/adk';

class EventActions {
  transferToAgent?: string;      // Agent transfer target
  skipSummarization?: boolean;   // Skip event summarization
  // Additional action metadata
}

const event = new Event({
  author: 'RoutingAgent',
  content: { parts: [{ text: 'Transferring to specialist...' }] },
  actions: new EventActions(),
});

event.actions.transferToAgent = 'SpecialistAgent';

Long-Running Tools

Track asynchronous tool execution:
const event = new Event({
  author: 'DataAgent',
  content: {
    parts: [
      {
        functionCall: {
          name: 'process_large_dataset',
          args: { dataset: 'users.csv' },
          id: 'call_456',
        }
      }
    ]
  },
  longRunningToolIds: new Set(['call_456']),
});

if (event.longRunningToolIds?.has('call_456')) {
  console.log('Tool is still running...');
}

Multi-Agent Branching

The branch field tracks agent hierarchy:
// Parent agent event
const parentEvent = new Event({
  author: 'MainAgent',
  branch: 'MainAgent',
  content: { parts: [{ text: 'Starting analysis...' }] },
});

// Child agent event
const childEvent = new Event({
  author: 'AnalysisAgent',
  branch: 'MainAgent.AnalysisAgent',
  content: { parts: [{ text: 'Analyzing data...' }] },
});

// Filter events by branch
const mainAgentEvents = session.events.filter(
  e => e.branch === 'MainAgent'
);

Event Manipulation

Creating Events

import { Event } from '@iqai/adk';

const event = new Event({
  author: 'MyAgent',
  invocationId: 'inv_123',
  content: {
    parts: [
      { text: 'Processing your request...' },
    ]
  },
  timestamp: Math.floor(Date.now() / 1000),
  id: Event.newId(), // Generate unique ID
});

Modifying Events

// Add usage metadata
event.usageMetadata = {
  promptTokenCount: 100,
  candidatesTokenCount: 50,
  totalTokenCount: 150,
};

// Add cache metadata
event.cacheMetadata = {
  cacheName: 'context_cache_1',
  fingerprint: 'abc123',
  contentsCount: 5,
};

// Mark as partial
event.partial = true;

Event Queries

// Check if final
if (event.isFinalResponse()) {
  console.log('This is the last event');
}

// Extract function calls
const functionCalls = event.getFunctionCalls();
if (functionCalls.length > 0) {
  console.log('Agent wants to call:', functionCalls.map(fc => fc.name));
}

// Extract function responses
const functionResponses = event.getFunctionResponses();

// Check for code execution results
if (event.hasTrailingCodeExecutionResult()) {
  console.log('Event contains code execution output');
}

Express.js Integration

import express from 'express';
import { AgentBuilder, StreamingMode } from '@iqai/adk';

const app = express();

app.get('/api/agent/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const agent = new AgentBuilder()
    .withModel('gpt-4')
    .buildLlm();
  
  try {
    for await (const event of agent.runAsync(context, {
      streamingMode: StreamingMode.SSE,
    })) {
      // Send event to client
      res.write(`data: ${JSON.stringify({
        id: event.id,
        author: event.author,
        text: event.text,
        partial: event.partial,
        timestamp: event.timestamp,
      })}\n\n`);
      
      if (event.isFinalResponse()) {
        res.write('data: [DONE]\n\n');
        res.end();
        break;
      }
    }
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    res.end();
  }
});

Next.js API Route

// app/api/agent/stream/route.ts
import { AgentBuilder, StreamingMode } from '@iqai/adk';

export async function POST(request: Request) {
  const { message } = await request.json();
  
  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    async start(controller) {
      const agent = new AgentBuilder()
        .withModel('gpt-4')
        .buildLlm();
      
      try {
        for await (const event of agent.runAsync(context, {
          streamingMode: StreamingMode.SSE,
        })) {
          const data = `data: ${JSON.stringify(event)}\n\n`;
          controller.enqueue(encoder.encode(data));
          
          if (event.isFinalResponse()) {
            controller.close();
            break;
          }
        }
      } catch (error) {
        controller.error(error);
      }
    },
  });
  
  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

WebSocket Alternative

import { WebSocketServer } from 'ws';
import { AgentBuilder } from '@iqai/adk';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  ws.on('message', async (data) => {
    const { message } = JSON.parse(data.toString());
    
    const agent = new AgentBuilder()
      .withModel('gpt-4')
      .buildLlm();
    
    for await (const event of agent.runAsync(context)) {
      ws.send(JSON.stringify(event));
      
      if (event.isFinalResponse()) {
        break;
      }
    }
  });
});

Event Storage

// Store events in session
for await (const event of agent.runAsync(context)) {
  // Automatically stored in session
  context.session.events.push(event);
}

// Retrieve conversation history
const history = context.session.events;

// Filter by author
const userMessages = history.filter(e => e.author === 'user');
const agentResponses = history.filter(e => e.author === 'MyAgent');

// Get latest event
const latestEvent = history[history.length - 1];

Performance Tips

  1. Buffer Partial Events: Accumulate small chunks before sending
  2. Compress Streams: Use gzip for large event payloads
  3. Event Batching: Group non-partial events when possible
  4. Connection Management: Handle client disconnections gracefully
  5. Memory Limits: Clear old events from sessions periodically
// Buffer small chunks
let buffer = '';
const MIN_CHUNK_SIZE = 50;

for await (const event of agent.runAsync(context, {
  streamingMode: StreamingMode.SSE,
})) {
  if (event.partial) {
    buffer += event.text || '';
    
    if (buffer.length >= MIN_CHUNK_SIZE) {
      sendToClient(buffer);
      buffer = '';
    }
  } else {
    if (buffer) {
      sendToClient(buffer);
      buffer = '';
    }
    sendToClient(event.text || '');
  }
}

Best Practices

Streaming Guidelines:
  1. Always handle partial events separately from complete events
  2. Check event.isFinalResponse() to know when conversation ends
  3. Implement reconnection logic for dropped connections
  4. Set appropriate timeouts for long-running operations
  5. Clean up resources when stream completes or errors
Use StreamingMode.SSE for production applications. The NONE mode is suitable for batch processing or testing.

Debugging Events

import { Logger } from '@iqai/adk';

const logger = new Logger({ name: 'EventDebugger' });

for await (const event of agent.runAsync(context)) {
  logger.debug('Event received', {
    id: event.id,
    author: event.author,
    partial: event.partial,
    contentLength: event.text?.length || 0,
    hasFunctionCalls: event.getFunctionCalls().length > 0,
    isFinal: event.isFinalResponse(),
  });
}

Next Steps

Authentication

Secure tool access with authentication patterns

Telemetry

Monitor agents with OpenTelemetry integration

Build docs developers (and LLMs) love