Skip to main content

Overview

The ChainExecutor trait enables multi-step LLM reasoning pipelines. A Chain is a sequence of steps that can:
  • Call LLMs with context from previous steps
  • Search memory for relevant data
  • Transform and format data
  • Emit events to the event bus
  • Call registered tools with parameters
Steps execute sequentially, with each step’s output becoming the input for the next step. Source: crates/oneclaw-core/src/orchestrator/chain.rs

Core Types

Chain

A complete chain definition. Location: chain.rs:122-141
pub struct Chain {
    /// The name of this chain
    pub name: String,
    /// The ordered list of steps in this chain
    pub steps: Vec<ChainStep>,
}
Constructor:
pub fn new(name: impl Into<String>) -> Self
Builder method:
pub fn add_step(mut self, step: ChainStep) -> Self
Example:
let chain = Chain::new("analyze-sensor")
    .add_step(ChainStep::memory_search("gather", "{input}", 5))
    .add_step(ChainStep::llm("analyze", "Analyze: {input}\nData: {step_0}"))
    .add_step(ChainStep::transform("format", "Report:\n{input}"));

ChainStep

A single step in a chain. Location: chain.rs:57-119
pub struct ChainStep {
    /// The name of this step
    pub name: String,
    /// The action to perform in this step
    pub action: StepAction,
}
Helper constructors:

llm()

Location: chain.rs:67-76 Creates an LLM call step with default max_tokens (300) and temperature (0.3).
pub fn llm(name: impl Into<String>, prompt: impl Into<String>) -> Self
Location: chain.rs:79-87 Creates a memory search step.
pub fn memory_search(
    name: impl Into<String>,
    query: impl Into<String>,
    limit: usize
) -> Self

transform()

Location: chain.rs:90-97 Creates a transform/formatting step.
pub fn transform(
    name: impl Into<String>,
    template: impl Into<String>
) -> Self

emit_event()

Location: chain.rs:100-107 Creates an event emission step.
pub fn emit_event(
    name: impl Into<String>,
    topic: impl Into<String>
) -> Self

tool_call()

Location: chain.rs:110-119 Creates a tool invocation step.
pub fn tool_call(
    name: impl Into<String>,
    tool_name: impl Into<String>,
    params: HashMap<String, String>
) -> Self

StepAction

Defines what a chain step does. Location: chain.rs:16-54
pub enum StepAction {
    /// Call LLM with given prompt template
    LlmCall {
        prompt_template: String,
        max_tokens: u32,
        temperature: f32,
    },
    
    /// Search memory with query derived from previous step
    MemorySearch {
        query_template: String,
        limit: usize,
    },
    
    /// Format/transform output using a template
    Transform {
        template: String,
    },
    
    /// Emit an event to the bus
    EmitEvent {
        topic: String,
    },
    
    /// Call a registered tool with params
    ToolCall {
        tool_name: String,
        params: HashMap<String, String>,
    },
}

StepResult

Result of executing one step. Location: chain.rs:144-152
pub struct StepResult {
    /// The name of the executed step
    pub step_name: String,
    /// The output produced by this step
    pub output: String,
    /// The execution time of this step in milliseconds
    pub latency_ms: u64,
}

ChainResult

Result of executing an entire chain. Location: chain.rs:155-165
pub struct ChainResult {
    /// The name of the executed chain
    pub chain_name: String,
    /// The results of each step in execution order
    pub steps: Vec<StepResult>,
    /// The output of the last step in the chain
    pub final_output: String,
    /// The total execution time of the chain in milliseconds
    pub total_latency_ms: u64,
}

ChainContext

Context passed to chain execution - provides access to LLM, memory, event bus, and tools. Location: chain.rs:168-179
pub struct ChainContext<'a> {
    /// The v1.5 provider for making LLM calls (None = offline)
    pub provider: Option<&'a dyn Provider>,
    /// The memory store for search steps
    pub memory: &'a dyn Memory,
    /// The event bus for emitting events
    pub event_bus: &'a dyn EventBus,
    /// The system prompt for LLM calls
    pub system_prompt: &'a str,
    /// The optional tool registry for tool call steps
    pub tool_registry: Option<&'a ToolRegistry>,
}

ChainExecutor Trait

Core trait for executing chains. Location: chain.rs:182-186
#[async_trait]
pub trait ChainExecutor: Send + Sync {
    /// Execute a chain with the given initial input and context
    async fn execute(
        &self,
        chain: &Chain,
        initial_input: &str,
        context: &ChainContext<'_>
    ) -> Result<ChainResult>;
}

Methods

execute()

Executes a chain with the given initial input and context. Parameters:
  • chain: &Chain - The chain to execute
  • initial_input: &str - Initial input for the first step
  • context: &ChainContext<'_> - Execution context with provider, memory, event bus, tools
Returns:
  • Result<ChainResult> - Results of all steps plus final output and latency
Example:
let executor = DefaultChainExecutor::new();
let result = executor.execute(&chain, "Check sensor_01", &context).await?;

println!("Chain: {}", result.chain_name);
println!("Steps executed: {}", result.steps.len());
println!("Final output: {}", result.final_output);
println!("Total latency: {}ms", result.total_latency_ms);

DefaultChainExecutor

Default implementation that actually runs steps. Location: chain.rs:217-359
pub struct DefaultChainExecutor;

Constructor

pub fn new() -> Self
Example:
let executor = DefaultChainExecutor::new();

Execution Logic

Location: chain.rs:229-359 The executor runs steps sequentially:
  1. Initialize state:
    • current_input = initial input
    • step_outputs = HashMap tracking each step’s output
    • step_results = Vector of StepResult
  2. For each step:
    • Execute action (LLM call, memory search, transform, event, tool call)
    • Record output and latency
    • Update current_input with step output
    • Store output in step_outputs[index]
  3. Return ChainResult with all step results and final output

Step Execution Details

LLM Call Step

Location: chain.rs:246-264
StepAction::LlmCall { prompt_template, .. } => {
    let prompt = substitute_template(prompt_template, &current_input, &step_outputs);
    
    match &ctx.provider {
        Some(provider) => {
            match provider.chat(ctx.system_prompt, &prompt) {
                Ok(resp) => resp.content,
                Err(e) => format!("[Error] Step '{}' failed: {}", step.name, e),
            }
        }
        None => format!("[Offline] No provider configured for step '{}'", step.name),
    }
}
  • Substitutes {input} and {step_N} in prompt template
  • Calls provider with system prompt and assembled prompt
  • Returns LLM response or error message
  • Handles offline mode gracefully

Memory Search Step

Location: chain.rs:266-286
StepAction::MemorySearch { query_template, limit } => {
    let query_text = substitute_template(query_template, &current_input, &step_outputs);
    let query = MemoryQuery::new(&query_text).with_limit(*limit);
    
    match ctx.memory.search(&query) {
        Ok(entries) if entries.is_empty() => {
            "Không tìm thấy dữ liệu liên quan.".to_string()
        }
        Ok(entries) => {
            let mut result = String::new();
            for entry in &entries {
                result.push_str(&format!(
                    "[{}] {}\n",
                    entry.created_at.format("%d/%m %H:%M"),
                    entry.content,
                ));
            }
            result
        }
        Err(e) => format!("[Memory error] {}", e),
    }
}
  • Substitutes variables in query template
  • Searches memory with limit
  • Formats results with timestamp and content
  • Returns Vietnamese message if no results

Transform Step

Location: chain.rs:288-290
StepAction::Transform { template } => {
    substitute_template(template, &current_input, &step_outputs)
}
  • Substitutes {input} and {step_N} placeholders
  • Returns formatted output

Emit Event Step

Location: chain.rs:292-299
StepAction::EmitEvent { topic } => {
    let event = Event::new(topic.clone(), format!("chain:{}", chain.name))
        .with_data("chain", &chain.name)
        .with_data("step", &step.name)
        .with_data("output", &current_input);
    let _ = ctx.event_bus.publish(event);
    current_input.clone() // Pass through
}
  • Creates event with chain/step metadata
  • Publishes to event bus
  • Passes current input through unchanged

Tool Call Step

Location: chain.rs:301-325
StepAction::ToolCall { tool_name, params } => {
    if let Some(registry) = ctx.tool_registry {
        // Substitute template variables in param values
        let resolved_params: HashMap<String, String> = params.iter()
            .map(|(k, v)| (k.clone(), substitute_template(v, &current_input, &step_outputs)))
            .collect();
        
        match registry.execute(tool_name, &resolved_params, Some(ctx.event_bus)) {
            Ok(result) => {
                if result.success {
                    result.output
                } else {
                    format!("[Tool failed] {}: {}", tool_name, result.output)
                }
            }
            Err(e) => format!("[Tool error] {}: {}", tool_name, e),
        }
    } else {
        format!("[No tools] Tool registry not available for '{}'", tool_name)
    }
}
  • Resolves template variables in all param values
  • Executes tool from registry
  • Returns tool output or error message
  • Handles missing registry gracefully

Template Substitution

Location: chain.rs:364-370 Chains use template substitution to pass data between steps:
fn substitute_template(
    template: &str,
    input: &str,
    step_outputs: &HashMap<usize, String>
) -> String {
    let mut result = template.replace("{input}", input);
    for (i, output) in step_outputs {
        result = result.replace(&format!("{{step_{}}}", i), output);
    }
    result
}
Placeholders:
  • {input} - Current input (output of previous step)
  • {step_0} - Output of step 0
  • {step_1} - Output of step 1
  • {step_N} - Output of step N
Example:
// Step 0 output: "sensor_01: 25.3°C"
// Step 1 input template: "Analyze: {step_0}\nContext: {input}"
// Result: "Analyze: sensor_01: 25.3°C\nContext: Check temperature"

NoopChainExecutor

Location: chain.rs:189-214 No-operation executor that returns input as output.
pub struct NoopChainExecutor;

impl NoopChainExecutor {
    pub fn new() -> Self { Self }
}

#[async_trait]
impl ChainExecutor for NoopChainExecutor {
    async fn execute(
        &self,
        chain: &Chain,
        initial_input: &str,
        _context: &ChainContext<'_>
    ) -> Result<ChainResult> {
        Ok(ChainResult {
            chain_name: chain.name.clone(),
            steps: vec![StepResult {
                step_name: "noop".into(),
                output: initial_input.to_string(),
                latency_ms: 0,
            }],
            final_output: initial_input.to_string(),
            total_latency_ms: 0,
        })
    }
}
Useful for testing chain construction without executing actual steps.

Usage Examples

Simple Analysis Chain

use oneclaw_core::orchestrator::{Chain, ChainStep, DefaultChainExecutor, ChainContext};

let chain = Chain::new("analyze-sensor")
    .add_step(ChainStep::memory_search(
        "gather-readings",
        "sensor readings {input}",
        10
    ))
    .add_step(ChainStep::llm(
        "analyze",
        "Analyze these sensor readings:\n{step_0}\n\nFocus on: {input}"
    ))
    .add_step(ChainStep::transform(
        "format-report",
        "Analysis Report\n================\n{input}"
    ));

let executor = DefaultChainExecutor::new();
let context = ChainContext {
    provider: Some(&provider),
    memory: &memory,
    event_bus: &event_bus,
    system_prompt: "You are OneClaw.",
    tool_registry: None,
};

let result = executor.execute(&chain, "temperature trends", &context).await?;
println!("Report:\n{}", result.final_output);

Alert Detection Chain

let alert_chain = Chain::new("detect-alert")
    .add_step(ChainStep::memory_search(
        "get-thresholds",
        "threshold settings {input}",
        5
    ))
    .add_step(ChainStep::llm(
        "check-alert",
        "Check if alert needed:\nCurrent: {input}\nThresholds: {step_0}"
    ))
    .add_step(ChainStep::emit_event(
        "notify",
        "sensor.alert"
    ))
    .add_step(ChainStep::transform(
        "format",
        "Alert Status: {step_1}"
    ));

let result = executor.execute(&alert_chain, "sensor_01: 35.2°C", &context).await?;

Tool Integration Chain

use std::collections::HashMap;

let mut tool_params = HashMap::new();
tool_params.insert("device_id".to_string(), "{input}".to_string());
tool_params.insert("action".to_string(), "restart".to_string());

let recovery_chain = Chain::new("auto-recovery")
    .add_step(ChainStep::memory_search(
        "check-history",
        "device failures {input}",
        5
    ))
    .add_step(ChainStep::llm(
        "decide-action",
        "Device: {input}\nHistory: {step_0}\nShould we restart?"
    ))
    .add_step(ChainStep::tool_call(
        "restart-device",
        "device_control",
        tool_params
    ))
    .add_step(ChainStep::emit_event(
        "log-recovery",
        "device.recovery"
    ));

let context_with_tools = ChainContext {
    provider: Some(&provider),
    memory: &memory,
    event_bus: &event_bus,
    system_prompt: "You are OneClaw.",
    tool_registry: Some(&tool_registry),
};

let result = executor.execute(&recovery_chain, "device_42", &context_with_tools).await?;

Multi-Step Data Pipeline

let pipeline = Chain::new("data-pipeline")
    .add_step(ChainStep::memory_search(
        "fetch-raw",
        "{input}",
        20
    ))
    .add_step(ChainStep::transform(
        "filter",
        "Filtered data from: {step_0}"
    ))
    .add_step(ChainStep::llm(
        "summarize",
        "Summarize this data:\n{input}"
    ))
    .add_step(ChainStep::llm(
        "generate-insights",
        "Based on this summary:\n{step_2}\n\nProvide 3 key insights."
    ))
    .add_step(ChainStep::emit_event(
        "publish-insights",
        "analytics.insights"
    ));

let result = executor.execute(&pipeline, "sensor data last 24h", &context).await?;

for (i, step) in result.steps.iter().enumerate() {
    println!("Step {}: {} ({}ms)", i, step.step_name, step.latency_ms);
}
println!("\nFinal insights:\n{}", result.final_output);
println!("\nTotal time: {}ms", result.total_latency_ms);

Offline Mode Handling

let context_offline = ChainContext {
    provider: None, // No LLM provider available
    memory: &memory,
    event_bus: &event_bus,
    system_prompt: "You are OneClaw.",
    tool_registry: None,
};

let chain_with_llm = Chain::new("needs-llm")
    .add_step(ChainStep::llm("analyze", "Analyze: {input}"));

let result = executor.execute(&chain_with_llm, "data", &context_offline).await?;
assert!(result.final_output.contains("[Offline]"));

Performance Monitoring

Chain execution provides detailed latency tracking:
let result = executor.execute(&chain, input, &context).await?;

// Per-step latency
for step in &result.steps {
    println!("{}: {}ms", step.step_name, step.latency_ms);
}

// Total latency
println!("Total: {}ms", result.total_latency_ms);

// Identify bottlenecks
let slowest = result.steps.iter()
    .max_by_key(|s| s.latency_ms)
    .unwrap();
println!("Slowest step: {} ({}ms)", slowest.step_name, slowest.latency_ms);

Error Handling

Chains handle errors gracefully:
  • LLM call fails: Returns [Error] Step 'name' failed: error_message
  • Memory search fails: Returns [Memory error] error_message
  • Tool call fails: Returns [Tool error] tool_name: error_message
  • Tool not found: Returns [Tool error] tool_name: error_message
  • No tool registry: Returns [No tools] Tool registry not available
  • Offline mode: Returns [Offline] No provider configured
Errors in steps don’t halt execution - the error message becomes the step output and execution continues.

See Also

  • ModelRouter - Routes requests to appropriate models
  • ContextManager - Assembles prompts with memory context
  • Memory - Memory storage and search
  • EventBus - Event publishing and subscription
  • Tool - Tool registration and execution
  • Provider - LLM provider abstraction

Build docs developers (and LLMs) love