Overview
The Executor is the main runtime orchestrator that drives complete AXON program execution. It coordinates model calls, validation, retry logic, memory operations, and tracing.
from axon.runtime import Executor, ModelClient, ModelResponse
import asyncio
class MyModelClient ( ModelClient ):
async def call ( self , system_prompt : str , user_prompt : str , ** kwargs ):
# Your LLM integration here
return ModelResponse( content = "Response from model" )
client = MyModelClient()
executor = Executor( client = client)
result = await executor.execute(compiled_program)
if result.success:
print ( "Program executed successfully" )
for unit in result.unit_results:
print ( f "Flow: { unit.flow_name } " )
for step in unit.step_results:
print ( f " { step.step_name } : { step.response.content } " )
else :
print ( f "Execution failed: { result.unit_results[ 0 ].error } " )
Class: Executor
Constructor
Executor(
client: ModelClient,
* ,
validator: SemanticValidator | None = None ,
retry_engine: RetryEngine | None = None ,
memory: MemoryBackend | None = None ,
tool_dispatcher: ToolDispatcher | None = None
)
The model client for LLM interaction
Optional custom validator (defaults to SemanticValidator())
Optional custom retry engine (defaults to RetryEngine())
Optional memory backend (defaults to InMemoryBackend())
Optional tool dispatcher for use statements
Example:
from axon.runtime import Executor, InMemoryBackend
from axon.runtime.tools.dispatcher import ToolDispatcher
client = MyModelClient()
memory = InMemoryBackend()
tools = ToolDispatcher()
executor = Executor(
client = client,
memory = memory,
tool_dispatcher = tools
)
Methods
async execute(program: CompiledProgram) -> ExecutionResult
Execute a complete compiled AXON program.
The compiled program to execute
Returns: ExecutionResult with all outcomes and execution trace
result = await executor.execute(compiled_program)
print ( f "Success: { result.success } " )
print ( f "Duration: { result.duration_ms } ms" )
print ( f "Units executed: { len (result.unit_results) } " )
if result.trace:
print ( f "Total events: { len (result.trace.events) } " )
ModelClient Protocol
The ModelClient protocol defines the interface between the runtime and LLM providers.
Interface
from typing import Protocol
from axon.runtime import ModelResponse
class ModelClient ( Protocol ):
async def call (
self ,
system_prompt : str ,
user_prompt : str ,
* ,
tools : list[dict[ str , Any]] | None = None ,
output_schema : dict[ str , Any] | None = None ,
effort : str = "" ,
failure_context : str = "" ,
) -> ModelResponse:
...
The system-level instructions (persona + anchors)
The user-level prompt (the step’s ask)
Optional tool declarations in provider-native format
Optional output schema for structured response parsing
Effort level hint (“low”, “medium”, “high”, “max”)
Previous failure reason for retry context injection
Implementation Example: Anthropic
from anthropic import Anthropic
from axon.runtime import ModelClient, ModelResponse
class AnthropicClient ( ModelClient ):
def __init__ ( self , api_key : str , model : str = "claude-3-5-sonnet-20241022" ):
self .client = Anthropic( api_key = api_key)
self .model = model
async def call (
self ,
system_prompt : str ,
user_prompt : str ,
* ,
tools : list[ dict ] | None = None ,
output_schema : dict | None = None ,
effort : str = "" ,
failure_context : str = "" ,
) -> ModelResponse:
messages = [{ "role" : "user" , "content" : user_prompt}]
if failure_context:
messages.insert( 0 , {
"role" : "user" ,
"content" : f "Previous attempt failed: { failure_context } . Please try again."
})
kwargs = {
"model" : self .model,
"system" : system_prompt,
"messages" : messages,
"max_tokens" : 8000 ,
}
if tools:
kwargs[ "tools" ] = tools
response = self .client.messages.create( ** kwargs)
content = ""
for block in response.content:
if block.type == "text" :
content += block.text
return ModelResponse(
content = content,
usage = {
"input_tokens" : response.usage.input_tokens,
"output_tokens" : response.usage.output_tokens,
},
raw = response,
)
Implementation Example: OpenAI
from openai import AsyncOpenAI
from axon.runtime import ModelClient, ModelResponse
class OpenAIClient ( ModelClient ):
def __init__ ( self , api_key : str , model : str = "gpt-4o" ):
self .client = AsyncOpenAI( api_key = api_key)
self .model = model
async def call (
self ,
system_prompt : str ,
user_prompt : str ,
* ,
tools : list[ dict ] | None = None ,
output_schema : dict | None = None ,
effort : str = "" ,
failure_context : str = "" ,
) -> ModelResponse:
messages = [
{ "role" : "system" , "content" : system_prompt},
{ "role" : "user" , "content" : user_prompt},
]
kwargs = {
"model" : self .model,
"messages" : messages,
}
if tools:
kwargs[ "tools" ] = tools
if output_schema:
kwargs[ "response_format" ] = {
"type" : "json_schema" ,
"json_schema" : output_schema,
}
response = await self .client.chat.completions.create( ** kwargs)
return ModelResponse(
content = response.choices[ 0 ].message.content,
usage = {
"prompt_tokens" : response.usage.prompt_tokens,
"completion_tokens" : response.usage.completion_tokens,
},
raw = response,
)
Response Types
ModelResponse
Normalized response from a model call.
class ModelResponse :
content: str
structured: dict[ str , Any] | None
tool_calls: list[dict[ str , Any]]
confidence: float | None
usage: dict[ str , int ]
raw: Any
def to_dict ( self ) -> dict[ str , Any]:
...
The textual content of the response
Parsed structured data (if output schema was provided)
Any tool invocations returned by the model
Model-reported confidence (0.0-1.0), if available
The raw provider response for debugging
ExecutionResult
Result of executing a complete AXON program.
class ExecutionResult :
unit_results: tuple[UnitResult, ... ]
trace: ExecutionTrace | None
success: bool
duration_ms: float
def to_dict ( self ) -> dict[ str , Any]:
...
Results for each execution unit (run statement)
The complete semantic execution trace
Whether the entire program succeeded
Total wall-clock time in milliseconds
UnitResult
Result of executing a single execution unit.
class UnitResult :
flow_name: str
step_results: tuple[StepResult, ... ]
success: bool
error: str
duration_ms: float
def to_dict ( self ) -> dict[ str , Any]:
...
StepResult
Result of executing a single step.
class StepResult :
step_name: str
response: ModelResponse | None
validation: ValidationResult | None
retry_info: RetryResult | None
duration_ms: float
def to_dict ( self ) -> dict[ str , Any]:
...
Error Handling
The executor automatically handles errors and retries using the retry engine:
from axon.runtime.runtime_errors import (
AxonRuntimeError,
ModelCallError,
ValidationError,
AnchorBreachError,
)
try :
result = await executor.execute(compiled_program)
if not result.success:
# Execution failed after all retries
unit = result.unit_results[ 0 ]
print ( f "Error in { unit.flow_name } : { unit.error } " )
except AxonRuntimeError as e:
print ( f "Runtime error: { e.message } " )
print ( f " Type: { e.error_type } " )
print ( f " Context: { e.context } " )
Execution Features
Automatic Retry
The executor automatically retries failed steps with configurable backoff:
# In AXON:
flow Process(doc: Document) -> Report {
step Extract {
ask: "Extract facts"
refine {
max_attempts: 3
backoff: exponential
pass_failure_context: true
}
}
}
Anchor Enforcement
The executor checks all active anchors after each model response:
# Anchor breach triggers automatic retry
result = await executor.execute(compiled_program)
for unit in result.unit_results:
for step in unit.step_results:
if step.retry_info and step.retry_info.attempts > 1 :
print ( f "Step { step.step_name } was retried { step.retry_info.attempts } times" )
Semantic Validation
Output types and confidence floors are automatically validated:
for unit in result.unit_results:
for step in unit.step_results:
if step.validation and not step.validation.is_valid:
print ( f "Validation failed for { step.step_name } :" )
for violation in step.validation.violations:
print ( f " - { violation.message } " )
Tracing
Every semantic event is recorded in the execution trace:
result = await executor.execute(compiled_program)
if result.trace:
print ( f "Trace events: { len (result.trace.events) } " )
for event in result.trace.events:
print ( f " { event.timestamp } : { event.event_type.value } " )
if event.step_name:
print ( f " Step: { event.step_name } " )
if event.data:
print ( f " Data: { event.data } " )
Complete Example
import asyncio
from axon import Lexer, Parser, TypeChecker, IRGenerator, get_backend
from axon.runtime import Executor, ModelClient, ModelResponse
from anthropic import Anthropic
class AnthropicClient ( ModelClient ):
def __init__ ( self , api_key : str ):
self .client = Anthropic( api_key = api_key)
async def call ( self , system_prompt : str , user_prompt : str , ** kwargs ):
response = self .client.messages.create(
model = "claude-3-5-sonnet-20241022" ,
system = system_prompt,
messages = [{ "role" : "user" , "content" : user_prompt}],
max_tokens = 8000 ,
)
return ModelResponse( content = response.content[ 0 ].text)
async def run_axon_program ( source : str , api_key : str ):
# Compile
lexer = Lexer(source)
parser = Parser(lexer.tokenize())
ast = parser.parse()
checker = TypeChecker(ast)
errors = checker.check()
if errors:
for error in errors:
print ( f "Error: { error.message } " )
return
ir_gen = IRGenerator()
ir_program = ir_gen.generate(ast)
backend = get_backend( "anthropic" )
compiled = backend.compile_program(ir_program)
# Execute
client = AnthropicClient( api_key = api_key)
executor = Executor( client = client)
result = await executor.execute(compiled)
# Output
if result.success:
print ( "✓ Execution completed successfully" )
for unit in result.unit_results:
print ( f " \n Flow: { unit.flow_name } " )
for step in unit.step_results:
print ( f " Step { step.step_name } :" )
print ( f " { step.response.content[: 200 ] } ..." )
else :
print ( "✗ Execution failed" )
for unit in result.unit_results:
if not unit.success:
print ( f " { unit.flow_name } : { unit.error } " )
# Usage
source = open ( "my_program.axon" ).read()
await run_axon_program(source, api_key = "..." )
Next Steps
Context API Manage execution state between steps
Tools API Build custom runtime tools
Memory API Implement semantic memory storage