Skip to main content

Overview

The BaseLlmFlow class is the abstract base class for all LLM-based agent execution flows in ADK-TS. It orchestrates the complete lifecycle of agent execution, including preprocessing, LLM calls, function execution, and postprocessing. Flows control how agents interact with LLMs, process function calls, and manage multi-step conversations through request and response processors.

Class Definition

import { BaseLlmFlow } from '@iqai/adk';

Architecture

Flow Lifecycle

Each agent execution follows this lifecycle:
  1. Preprocessing - Prepare LLM request with tools, context, and instructions
  2. LLM Call - Send request to the language model
  3. Postprocessing - Handle response, execute functions, manage transfers
  4. Iteration - Repeat until final response

Processor Pipeline

BaseLlmFlow uses two processor arrays:
requestProcessors
Array<BaseLlmRequestProcessor>
Processors that transform the LLM request before sending to the model. Run in sequence during preprocessing.
responseProcessors
Array<BaseLlmResponseProcessor>
Processors that handle LLM responses after receiving from the model. Run in sequence during postprocessing.

Key Methods

runAsync()

Runs the agent flow asynchronously, yielding events as they occur.
async *runAsync(invocationContext: InvocationContext): AsyncGenerator<Event>
Parameters:
invocationContext
InvocationContext
required
Context object containing agent, session, and execution configuration
Yields: Event objects representing each step of the agent execution Example:
import { InvocationContext } from '@iqai/adk';

const invocationContext = new InvocationContext({
  agent: myAgent,
  session: session,
  runConfig: { streamingMode: StreamingMode.SSE }
});

for await (const event of flow.runAsync(invocationContext)) {
  console.log(`Event from ${event.author}:`, event.content);
  
  if (event.isFinalResponse()) {
    break;
  }
}

runLive()

Runs the agent in live mode with real-time interaction support.
async *runLive(invocationContext: InvocationContext): AsyncGenerator<Event>
Live mode is not fully implemented in the current version and delegates to runAsync().

Built-in Flow Implementations

SingleFlow

Handles tool calls without sub-agent support. Best for simple agents with tools.
import { SingleFlow } from '@iqai/adk';

const flow = new SingleFlow();
Features:
  • Basic request/response processing
  • Tool and function call execution
  • Code execution support
  • Natural language planning
  • Output schema validation
  • Context caching
Processors:
  • basicRequestProcessor - Core request setup
  • authRequestProcessor - Authentication handling
  • instructionsRequestProcessor - System instructions
  • identityRequestProcessor - Agent identity
  • contentRequestProcessor - Content preparation
  • contextCacheRequestProcessor - Context caching
  • nlPlanningRequestProcessor - Planning capabilities
  • codeExecutionRequestProcessor - Code execution setup
  • nlPlanningResponseProcessor - Planning response handling
  • outputSchemaResponseProcessor - Schema validation
  • codeExecutionResponseProcessor - Code execution results

AutoFlow

Extends SingleFlow with agent transfer capabilities.
import { AutoFlow } from '@iqai/adk';

const flow = new AutoFlow();
Features:
  • All SingleFlow features
  • Agent transfer support (parent → child, child → parent, peer → peer)
  • Multi-agent orchestration
Transfer Rules:
  1. Parent can transfer to sub-agent
  2. Sub-agent can transfer to parent
  3. Sub-agent can transfer to peer agents (if parent uses AutoFlow)

Creating Custom Flows

Extend BaseLlmFlow to create custom execution flows:
import { BaseLlmFlow } from '@iqai/adk';
import { customRequestProcessor } from './processors';

export class CustomFlow extends BaseLlmFlow {
  constructor() {
    super();
    
    // Add custom request processors
    this.requestProcessors.push(customRequestProcessor);
    
    // Add custom response processors
    this.responseProcessors.push(customResponseProcessor);
    
    this.logger.debug('CustomFlow initialized');
  }
}

Advanced Usage

Custom Request Processor

Create a processor to modify LLM requests:
import { 
  BaseLlmRequestProcessor,
  InvocationContext,
  LlmRequest,
  Event 
} from '@iqai/adk';

export class CustomRequestProcessor extends BaseLlmRequestProcessor {
  async *runAsync(
    invocationContext: InvocationContext,
    llmRequest: LlmRequest
  ): AsyncGenerator<Event> {
    // Modify the request before it's sent to the LLM
    llmRequest.config = llmRequest.config || {};
    llmRequest.config.temperature = 0.7;
    
    // Optionally yield events during preprocessing
    // (no events needed for this simple example)
  }
}

export const customRequestProcessor = new CustomRequestProcessor();

Custom Response Processor

Create a processor to handle LLM responses:
import { 
  BaseLlmResponseProcessor,
  InvocationContext,
  LlmResponse,
  Event 
} from '@iqai/adk';

export class CustomResponseProcessor extends BaseLlmResponseProcessor {
  async *runAsync(
    invocationContext: InvocationContext,
    llmResponse: LlmResponse
  ): AsyncGenerator<Event> {
    // Process the response after receiving from LLM
    if (llmResponse.content) {
      console.log('Received response:', llmResponse.content);
    }
    
    // Optionally yield events during postprocessing
    // (no events needed for this simple example)
  }
}

export const customResponseProcessor = new CustomResponseProcessor();

Flow with Callbacks

Handle before/after model callbacks:
import { AgentBuilder, CallbackContext } from '@iqai/adk';

const agent = new AgentBuilder()
  .withName('CallbackAgent')
  .withModel('gpt-4')
  .withBeforeModelCallback((context: CallbackContext) => {
    console.log('About to call model');
    // Return LlmResponse to bypass model call, or undefined to continue
    return undefined;
  })
  .withAfterModelCallback((context: CallbackContext) => {
    console.log('Received model response');
    // Return LlmResponse to replace response, or undefined to use original
    return undefined;
  })
  .buildLlm();

Monitoring Flow Execution

Track flow execution with detailed logging:
import { Logger } from '@iqai/adk';

const logger = new Logger({ name: 'FlowMonitor' });

for await (const event of flow.runAsync(invocationContext)) {
  logger.debug(`Step: ${invocationContext.llmCallCount}`);
  logger.debug(`Author: ${event.author}`);
  logger.debug(`Event ID: ${event.id}`);
  
  const functionCalls = event.getFunctionCalls();
  if (functionCalls.length > 0) {
    logger.debug(`Function calls: ${functionCalls.map(f => f.name).join(', ')}`);
  }
  
  if (event.actions.transferToAgent) {
    logger.debug(`Transfer to: ${event.actions.transferToAgent}`);
  }
  
  if (event.isFinalResponse()) {
    logger.debug('Flow completed');
  }
}

Flow Execution Details

Step Iteration

The flow continues executing steps until a final response is reached:
while (true) {
  // Execute one step
  for await (const event of this._runOneStepAsync(invocationContext)) {
    yield event;
  }
  
  // Check if final response
  if (lastEvent?.isFinalResponse()) {
    break;
  }
}

Function Call Handling

When the LLM returns function calls, the flow:
  1. Extracts function calls from the event
  2. Executes each function with proper context
  3. Collects function responses
  4. Creates a function response event
  5. Continues execution if needed
const functionCalls = event.getFunctionCalls();
if (functionCalls.length > 0) {
  // Execute functions
  const responseEvent = await handleFunctionCallsAsync(
    invocationContext,
    event,
    toolsDict
  );
  
  yield responseEvent;
  
  // Handle agent transfer if requested
  if (responseEvent.actions?.transferToAgent) {
    yield* transferredAgent.runAsync(invocationContext);
  }
}

Telemetry and Metrics

Flows automatically track:
  • LLM call count and duration
  • Token usage (prompt and completion tokens)
  • Function execution time
  • Error rates and types
// Metrics are automatically recorded
telemetryService.recordLlmCall({
  model: llm.model,
  agentName: invocationContext.agent.name,
  environment: process.env.NODE_ENV,
  status: 'success'
});

telemetryService.recordLlmTokens(
  promptTokens,
  completionTokens,
  { model: llm.model }
);

Error Handling

Flows handle errors at multiple levels:
try {
  for await (const llmResponse of llm.generateContentAsync(request)) {
    yield llmResponse;
  }
} catch (error) {
  // Try plugin error recovery
  const recoveryResponse = await pluginManager.runOnModelErrorCallback({
    callbackContext,
    llmRequest,
    error
  });
  
  if (recoveryResponse) {
    yield recoveryResponse;
    return;
  }
  
  // Re-throw if no recovery
  throw error;
}

Configuration

Streaming Mode

Control response streaming:
import { StreamingMode } from '@iqai/adk';

const invocationContext = new InvocationContext({
  agent: myAgent,
  session: session,
  runConfig: {
    streamingMode: StreamingMode.SSE  // or StreamingMode.NONE
  }
});

Tool Deduplication

Flows automatically deduplicate tools by name to prevent errors:
// Tools with duplicate names are filtered
const uniqueTools = Array.from(
  new Map(tools.map(t => [t.name, t])).values()
);

Type Reference

InvocationContext

Context passed through flow execution:
interface InvocationContext {
  agent: BaseAgent;
  session: Session;
  runConfig: RunConfig;
  invocationId: string;
  branch?: string;
  pluginManager: PluginManager;
  
  incrementLlmCallCount(): void;
  nextLlmSpanIndex(): number;
}

RunConfig

Configuration for flow execution:
interface RunConfig {
  streamingMode: StreamingMode;
  supportCfc?: boolean;  // Continuous Function Calling
}

Event

Learn about events in agent flows

EventActions

Understand event actions and state management

AgentBuilder

Create agents that use flows

Tools

Build custom tools for agents

Source Code

View the source: base-llm-flow.ts

Build docs developers (and LLMs) love