Skip to main content

Overview

The Executor is the main runtime orchestrator that drives complete AXON program execution. It coordinates model calls, validation, retry logic, memory operations, and tracing.
from axon.runtime import Executor, ModelClient, ModelResponse
import asyncio

class MyModelClient(ModelClient):
    async def call(self, system_prompt: str, user_prompt: str, **kwargs):
        # Your LLM integration here
        return ModelResponse(content="Response from model")

client = MyModelClient()
executor = Executor(client=client)

result = await executor.execute(compiled_program)

if result.success:
    print("Program executed successfully")
    for unit in result.unit_results:
        print(f"Flow: {unit.flow_name}")
        for step in unit.step_results:
            print(f"  {step.step_name}: {step.response.content}")
else:
    print(f"Execution failed: {result.unit_results[0].error}")

Class: Executor

Constructor

Executor(
    client: ModelClient,
    *,
    validator: SemanticValidator | None = None,
    retry_engine: RetryEngine | None = None,
    memory: MemoryBackend | None = None,
    tool_dispatcher: ToolDispatcher | None = None
)
client
ModelClient
required
The model client for LLM interaction
validator
SemanticValidator
Optional custom validator (defaults to SemanticValidator())
retry_engine
RetryEngine
Optional custom retry engine (defaults to RetryEngine())
memory
MemoryBackend
Optional memory backend (defaults to InMemoryBackend())
tool_dispatcher
ToolDispatcher
Optional tool dispatcher for use statements
Example:
from axon.runtime import Executor, InMemoryBackend
from axon.runtime.tools.dispatcher import ToolDispatcher

client = MyModelClient()
memory = InMemoryBackend()
tools = ToolDispatcher()

executor = Executor(
    client=client,
    memory=memory,
    tool_dispatcher=tools
)

Methods

async execute(program: CompiledProgram) -> ExecutionResult

Execute a complete compiled AXON program.
program
CompiledProgram
required
The compiled program to execute
Returns: ExecutionResult with all outcomes and execution trace
result = await executor.execute(compiled_program)

print(f"Success: {result.success}")
print(f"Duration: {result.duration_ms}ms")
print(f"Units executed: {len(result.unit_results)}")

if result.trace:
    print(f"Total events: {len(result.trace.events)}")

ModelClient Protocol

The ModelClient protocol defines the interface between the runtime and LLM providers.

Interface

from typing import Protocol
from axon.runtime import ModelResponse

class ModelClient(Protocol):
    async def call(
        self,
        system_prompt: str,
        user_prompt: str,
        *,
        tools: list[dict[str, Any]] | None = None,
        output_schema: dict[str, Any] | None = None,
        effort: str = "",
        failure_context: str = "",
    ) -> ModelResponse:
        ...
system_prompt
str
required
The system-level instructions (persona + anchors)
user_prompt
str
required
The user-level prompt (the step’s ask)
tools
list[dict]
Optional tool declarations in provider-native format
output_schema
dict
Optional output schema for structured response parsing
effort
str
Effort level hint (“low”, “medium”, “high”, “max”)
failure_context
str
Previous failure reason for retry context injection

Implementation Example: Anthropic

from anthropic import Anthropic
from axon.runtime import ModelClient, ModelResponse

class AnthropicClient(ModelClient):
    def __init__(self, api_key: str, model: str = "claude-3-5-sonnet-20241022"):
        self.client = Anthropic(api_key=api_key)
        self.model = model
    
    async def call(
        self,
        system_prompt: str,
        user_prompt: str,
        *,
        tools: list[dict] | None = None,
        output_schema: dict | None = None,
        effort: str = "",
        failure_context: str = "",
    ) -> ModelResponse:
        messages = [{"role": "user", "content": user_prompt}]
        
        if failure_context:
            messages.insert(0, {
                "role": "user",
                "content": f"Previous attempt failed: {failure_context}. Please try again."
            })
        
        kwargs = {
            "model": self.model,
            "system": system_prompt,
            "messages": messages,
            "max_tokens": 8000,
        }
        
        if tools:
            kwargs["tools"] = tools
        
        response = self.client.messages.create(**kwargs)
        
        content = ""
        for block in response.content:
            if block.type == "text":
                content += block.text
        
        return ModelResponse(
            content=content,
            usage={
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens,
            },
            raw=response,
        )

Implementation Example: OpenAI

from openai import AsyncOpenAI
from axon.runtime import ModelClient, ModelResponse

class OpenAIClient(ModelClient):
    def __init__(self, api_key: str, model: str = "gpt-4o"):
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model
    
    async def call(
        self,
        system_prompt: str,
        user_prompt: str,
        *,
        tools: list[dict] | None = None,
        output_schema: dict | None = None,
        effort: str = "",
        failure_context: str = "",
    ) -> ModelResponse:
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
        
        kwargs = {
            "model": self.model,
            "messages": messages,
        }
        
        if tools:
            kwargs["tools"] = tools
        
        if output_schema:
            kwargs["response_format"] = {
                "type": "json_schema",
                "json_schema": output_schema,
            }
        
        response = await self.client.chat.completions.create(**kwargs)
        
        return ModelResponse(
            content=response.choices[0].message.content,
            usage={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
            },
            raw=response,
        )

Response Types

ModelResponse

Normalized response from a model call.
class ModelResponse:
    content: str
    structured: dict[str, Any] | None
    tool_calls: list[dict[str, Any]]
    confidence: float | None
    usage: dict[str, int]
    raw: Any
    
    def to_dict(self) -> dict[str, Any]:
        ...
content
str
The textual content of the response
structured
dict
Parsed structured data (if output schema was provided)
tool_calls
list[dict]
Any tool invocations returned by the model
confidence
float
Model-reported confidence (0.0-1.0), if available
usage
dict
Token usage statistics
raw
Any
The raw provider response for debugging

ExecutionResult

Result of executing a complete AXON program.
class ExecutionResult:
    unit_results: tuple[UnitResult, ...]
    trace: ExecutionTrace | None
    success: bool
    duration_ms: float
    
    def to_dict(self) -> dict[str, Any]:
        ...
unit_results
tuple[UnitResult]
Results for each execution unit (run statement)
trace
ExecutionTrace
The complete semantic execution trace
success
bool
Whether the entire program succeeded
duration_ms
float
Total wall-clock time in milliseconds

UnitResult

Result of executing a single execution unit.
class UnitResult:
    flow_name: str
    step_results: tuple[StepResult, ...]
    success: bool
    error: str
    duration_ms: float
    
    def to_dict(self) -> dict[str, Any]:
        ...

StepResult

Result of executing a single step.
class StepResult:
    step_name: str
    response: ModelResponse | None
    validation: ValidationResult | None
    retry_info: RetryResult | None
    duration_ms: float
    
    def to_dict(self) -> dict[str, Any]:
        ...

Error Handling

The executor automatically handles errors and retries using the retry engine:
from axon.runtime.runtime_errors import (
    AxonRuntimeError,
    ModelCallError,
    ValidationError,
    AnchorBreachError,
)

try:
    result = await executor.execute(compiled_program)
    
    if not result.success:
        # Execution failed after all retries
        unit = result.unit_results[0]
        print(f"Error in {unit.flow_name}: {unit.error}")
except AxonRuntimeError as e:
    print(f"Runtime error: {e.message}")
    print(f"  Type: {e.error_type}")
    print(f"  Context: {e.context}")

Execution Features

Automatic Retry

The executor automatically retries failed steps with configurable backoff:
# In AXON:
flow Process(doc: Document) -> Report {
    step Extract {
        ask: "Extract facts"
        
        refine {
            max_attempts: 3
            backoff: exponential
            pass_failure_context: true
        }
    }
}

Anchor Enforcement

The executor checks all active anchors after each model response:
# Anchor breach triggers automatic retry
result = await executor.execute(compiled_program)

for unit in result.unit_results:
    for step in unit.step_results:
        if step.retry_info and step.retry_info.attempts > 1:
            print(f"Step {step.step_name} was retried {step.retry_info.attempts} times")

Semantic Validation

Output types and confidence floors are automatically validated:
for unit in result.unit_results:
    for step in unit.step_results:
        if step.validation and not step.validation.is_valid:
            print(f"Validation failed for {step.step_name}:")
            for violation in step.validation.violations:
                print(f"  - {violation.message}")

Tracing

Every semantic event is recorded in the execution trace:
result = await executor.execute(compiled_program)

if result.trace:
    print(f"Trace events: {len(result.trace.events)}")
    
    for event in result.trace.events:
        print(f"{event.timestamp}: {event.event_type.value}")
        if event.step_name:
            print(f"  Step: {event.step_name}")
        if event.data:
            print(f"  Data: {event.data}")

Complete Example

import asyncio
from axon import Lexer, Parser, TypeChecker, IRGenerator, get_backend
from axon.runtime import Executor, ModelClient, ModelResponse
from anthropic import Anthropic

class AnthropicClient(ModelClient):
    def __init__(self, api_key: str):
        self.client = Anthropic(api_key=api_key)
    
    async def call(self, system_prompt: str, user_prompt: str, **kwargs):
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            system=system_prompt,
            messages=[{"role": "user", "content": user_prompt}],
            max_tokens=8000,
        )
        return ModelResponse(content=response.content[0].text)

async def run_axon_program(source: str, api_key: str):
    # Compile
    lexer = Lexer(source)
    parser = Parser(lexer.tokenize())
    ast = parser.parse()
    
    checker = TypeChecker(ast)
    errors = checker.check()
    if errors:
        for error in errors:
            print(f"Error: {error.message}")
        return
    
    ir_gen = IRGenerator()
    ir_program = ir_gen.generate(ast)
    
    backend = get_backend("anthropic")
    compiled = backend.compile_program(ir_program)
    
    # Execute
    client = AnthropicClient(api_key=api_key)
    executor = Executor(client=client)
    
    result = await executor.execute(compiled)
    
    # Output
    if result.success:
        print("✓ Execution completed successfully")
        for unit in result.unit_results:
            print(f"\nFlow: {unit.flow_name}")
            for step in unit.step_results:
                print(f"  Step {step.step_name}:")
                print(f"    {step.response.content[:200]}...")
    else:
        print("✗ Execution failed")
        for unit in result.unit_results:
            if not unit.success:
                print(f"  {unit.flow_name}: {unit.error}")

# Usage
source = open("my_program.axon").read()
await run_axon_program(source, api_key="...")

Next Steps

Context API

Manage execution state between steps

Tools API

Build custom runtime tools

Memory API

Implement semantic memory storage

Build docs developers (and LLMs) love