Runtime Executor
The Executor is the heart of Phase 3: it orchestrates the complete execution of compiled AXON programs.CompiledProgram → [Executor] → ExecutionResult (with trace, validation, retry)
Architecture Overview
The Executor coordinates six key subsystems:- Model Client — LLM API calls (Anthropic, OpenAI, etc.)
- Context Manager — Per-unit state tracking
- Semantic Validator — Type contract enforcement
- Retry Engine — Adaptive retry with failure context
- Memory Backend — Persistent semantic storage
- Tracer — Complete execution observability
Implementation
from axon.backends.base_backend import CompiledProgram, CompiledExecutionUnit, CompiledStep
from axon.runtime.semantic_validator import SemanticValidator
from axon.runtime.retry_engine import RetryEngine
from axon.runtime.memory_backend import MemoryBackend, InMemoryBackend
from axon.runtime.tracer import Tracer, TraceEventType
from axon.runtime.context_mgr import ContextManager
class Executor:
"""Orchestrates the execution of compiled AXON programs."""
def __init__(
self,
client: ModelClient,
*,
validator: SemanticValidator | None = None,
retry_engine: RetryEngine | None = None,
memory: MemoryBackend | None = None,
tool_dispatcher: ToolDispatcher | None = None,
) -> None:
self._client = client
self._validator = validator or SemanticValidator()
self._retry_engine = retry_engine or RetryEngine()
self._memory = memory or InMemoryBackend()
self._tool_dispatcher = tool_dispatcher
async def execute(self, program: CompiledProgram) -> ExecutionResult:
"""Execute a complete compiled AXON program."""
tracer = Tracer(
program_name=program.metadata.get("program_name", ""),
backend_name=program.backend_name,
)
unit_results: list[UnitResult] = []
for unit in program.execution_units:
unit_result = await self._execute_unit(unit, tracer)
unit_results.append(unit_result)
trace = tracer.finalize()
return ExecutionResult(
unit_results=tuple(unit_results),
trace=trace,
success=all(u.success for u in unit_results),
)
Model Client Protocol
The Executor is decoupled from LLM APIs via theModelClient protocol:
from typing import Protocol
class ModelClient(Protocol):
"""Protocol for LLM model interaction."""
async def call(
self,
system_prompt: str,
user_prompt: str,
*,
tools: list[dict[str, Any]] | None = None,
output_schema: dict[str, Any] | None = None,
effort: str = "",
failure_context: str = "",
) -> ModelResponse:
"""Send a prompt to the model and return the response."""
...
from anthropic import AsyncAnthropic
class AnthropicClient:
def __init__(self, api_key: str):
self._client = AsyncAnthropic(api_key=api_key)
async def call(self, system_prompt, user_prompt, **kwargs) -> ModelResponse:
response = await self._client.messages.create(
model="claude-3-5-sonnet-20241022",
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
# ...
)
return ModelResponse(content=response.content[0].text)
Execution Flow
Unit Execution
async def _execute_unit(
self,
unit: CompiledExecutionUnit,
tracer: Tracer,
) -> UnitResult:
"""Execute a single execution unit (one run statement)."""
# Open a trace span
tracer.start_span(
f"unit:{unit.flow_name}",
metadata={
"persona": unit.persona_name,
"context": unit.context_name,
},
)
# Create context manager for this unit
ctx = ContextManager(
system_prompt=unit.system_prompt,
tracer=tracer,
)
# Execute each step
step_results: list[StepResult] = []
for step in unit.steps:
step_result = await self._execute_step(
step=step,
unit=unit,
ctx=ctx,
tracer=tracer,
)
step_results.append(step_result)
# Store result in context for downstream steps
if step.step_name and step_result.response:
output = (
step_result.response.structured
or step_result.response.content
)
ctx.set_step_result(step.step_name, output)
tracer.end_span()
return UnitResult(
flow_name=unit.flow_name,
step_results=tuple(step_results),
success=True,
)
Step Execution Pipeline
async def _execute_step(
self,
step: CompiledStep,
unit: CompiledExecutionUnit,
ctx: ContextManager,
tracer: Tracer,
) -> StepResult:
"""Execute a single compiled step.
Pipeline:
1. Model call
2. Anchor checking
3. Semantic validation
4. Retry on failure (if configured)
"""
tracer.emit(TraceEventType.STEP_START, step_name=step.step_name)
# Build the step callable with CPS validation
async def run_step(failure_context: str = "") -> StepResult:
response = await self._call_model(
step=step,
unit=unit,
ctx=ctx,
tracer=tracer,
failure_context=failure_context,
)
# CPS chain: anchors → validation → success
return self._check_anchors_cps(
response=response,
unit=unit,
step_name=step.step_name,
tracer=tracer,
on_success=lambda: self._validate_response_cps(
response=response,
step=step,
step_name=step.step_name,
tracer=tracer,
on_success=lambda validation: StepResult(
step_name=step.step_name,
response=response,
validation=validation,
),
on_failure=lambda violations: self._raise_validation_error(violations),
),
on_failure=lambda violations: self._raise_anchor_error(violations),
)
# Execute with retry
retry_config = self._extract_refine_config(step)
retry_result = await self._retry_engine.execute_with_retry(
fn=run_step,
config=retry_config,
tracer=tracer,
step_name=step.step_name,
)
return retry_result.result
Continuation-Passing Style (CPS)
The Executor uses CPS for composable error handling:Anchor Checking CPS
def _check_anchors_cps(
self,
response: ModelResponse,
unit: CompiledExecutionUnit,
step_name: str,
tracer: Tracer,
on_success: Callable[[], Any],
on_failure: Callable[[list[str]], Any],
) -> Any:
"""Check anchor constraints using continuation-passing style."""
if not unit.active_anchors:
return on_success()
from axon.stdlib.anchors.definitions import ALL_ANCHORS
anchor_map = {a.ir.name: a for a in ALL_ANCHORS}
all_violations: list[str] = []
for anchor_data in unit.active_anchors:
anchor_name = anchor_data.get("name")
if anchor_name not in anchor_map:
continue
stdlib_anchor = anchor_map[anchor_name]
tracer.emit_anchor_check(
anchor_name=anchor_name,
step_name=step_name,
)
passed, violations = stdlib_anchor.checker_fn(response.content)
tracer.emit(
TraceEventType.ANCHOR_PASS if passed
else TraceEventType.ANCHOR_BREACH,
step_name=step_name,
data={"anchor": anchor_name, "passed": passed},
)
if not passed:
all_violations.extend(violations)
if all_violations:
return on_failure(all_violations)
return on_success()
Validation CPS
def _validate_response_cps(
self,
response: ModelResponse,
step: CompiledStep,
step_name: str,
tracer: Tracer,
on_success: Callable[[ValidationResult], Any],
on_failure: Callable[[list[str]], Any],
) -> Any:
"""Validate response using continuation-passing style."""
if not step.output_schema:
return on_success(ValidationResult())
result = self._validator.validate(
output=response.structured or response.content,
expected_type=step.metadata.get("output_type", ""),
confidence_floor=step.metadata.get("confidence_floor"),
)
tracer.emit_validation_result(
step_name=step_name,
passed=result.is_valid,
violations=[v.message for v in result.violations],
)
if not result.is_valid:
return on_failure([v.message for v in result.violations])
return on_success(result)
Model Call Lifecycle
async def _call_model(
self,
step: CompiledStep,
unit: CompiledExecutionUnit,
ctx: ContextManager,
tracer: Tracer,
failure_context: str = "",
) -> ModelResponse:
"""Make a model call for a step."""
# Build user prompt with context injection
user_prompt = self._build_user_prompt(step, ctx)
tracer.emit_model_call(
step_name=step.step_name,
prompt_tokens=len(user_prompt),
)
try:
response = await self._client.call(
system_prompt=unit.system_prompt,
user_prompt=user_prompt,
tools=unit.tool_declarations or None,
output_schema=step.output_schema,
effort=unit.effort,
failure_context=failure_context, # ← Retry context injection
)
except Exception as exc:
raise ModelCallError(
message=f"Model call failed for step '{step.step_name}': {exc}",
) from exc
tracer.emit(
TraceEventType.MODEL_RESPONSE,
step_name=step.step_name,
data={
"content_length": len(response.content),
"has_structured": response.structured is not None,
},
)
# Record in context message history
ctx.append_message("user", user_prompt)
ctx.append_message("assistant", response.content)
return response
Context Management
TheContextManager tracks per-unit state:
class ContextManager:
"""Manages execution context for a single unit."""
def __init__(self, system_prompt: str, tracer: Tracer):
self._system_prompt = system_prompt
self._tracer = tracer
self._step_results: dict[str, Any] = {}
self._message_history: list[dict[str, str]] = []
def set_step_result(self, step_name: str, output: Any) -> None:
"""Store a step's output for downstream references."""
self._step_results[step_name] = output
def get_step_result(self, step_name: str) -> Any:
"""Retrieve a prior step's output."""
return self._step_results.get(step_name)
def append_message(self, role: str, content: str) -> None:
"""Record a message in the conversation history."""
self._message_history.append({"role": role, "content": content})
@property
def completed_steps(self) -> list[str]:
"""Names of all completed steps."""
return list(self._step_results.keys())
{{Extract.output}} in prompts.
Data Structures
ModelResponse
@dataclass(frozen=True)
class ModelResponse:
"""Normalized response from a model call."""
content: str = ""
structured: dict[str, Any] | None = None
tool_calls: list[dict[str, Any]] = field(default_factory=list)
confidence: float | None = None
usage: dict[str, int] = field(default_factory=dict)
raw: Any = None
StepResult
@dataclass(frozen=True)
class StepResult:
"""Result of executing a single compiled step."""
step_name: str = ""
response: ModelResponse | None = None
validation: ValidationResult | None = None
retry_info: RetryResult | None = None
duration_ms: float = 0.0
ExecutionResult
@dataclass(frozen=True)
class ExecutionResult:
"""Result of executing a complete AXON program."""
unit_results: tuple[UnitResult, ...] = ()
trace: ExecutionTrace | None = None
success: bool = True
duration_ms: float = 0.0
Tool Step Execution
When a step uses a tool, execution routes through the ToolDispatcher:async def _execute_tool_step(
self,
step: CompiledStep,
ctx: ContextManager,
tracer: Tracer,
) -> StepResult:
"""Execute a step that uses a tool."""
use_tool_meta = step.metadata["use_tool"]
if self._tool_dispatcher is None:
raise AxonRuntimeError(
message=f"Step '{step.step_name}' requires a tool but "
"no ToolDispatcher was provided."
)
from axon.compiler.ir_nodes import IRUseTool
ir_use_tool = IRUseTool(
tool_name=use_tool_meta.get("tool_name", ""),
argument=self._build_user_prompt(step, ctx),
)
tool_result = await self._tool_dispatcher.dispatch(
ir_use_tool,
context={"step_name": step.step_name},
)
# Convert ToolResult → ModelResponse
response = ModelResponse(
content=json.dumps(tool_result.data) if tool_result.data else "",
structured=tool_result.data if isinstance(tool_result.data, dict) else None,
)
if not tool_result.success:
raise AxonRuntimeError(
message=f"Tool '{ir_use_tool.tool_name}' failed: {tool_result.error}"
)
return StepResult(
step_name=step.step_name,
response=response,
)
Error Handling
Error Types
class AxonRuntimeError(Exception):
"""Base class for all AXON runtime errors."""
pass
class ModelCallError(AxonRuntimeError):
"""Raised when an LLM API call fails."""
pass
class AnchorBreachError(AxonRuntimeError):
"""Raised when an anchor constraint is violated."""
pass
class ValidationError(AxonRuntimeError):
"""Raised when semantic validation fails."""
pass
class RefineExhaustedError(AxonRuntimeError):
"""Raised when all retry attempts are exhausted."""
pass
Error Context
@dataclass
class ErrorContext:
"""Structured context for runtime errors."""
step_name: str = ""
flow_name: str = ""
attempt: int = 0
expected_type: str = ""
actual_value: Any = None
details: str = ""
Usage Example
from axon.compiler import Lexer, Parser, TypeChecker, IRGenerator
from axon.backends.anthropic_backend import AnthropicBackend
from axon.runtime.executor import Executor
from anthropic import AsyncAnthropic
# Phase 1: Compile source to IR
source = open("program.axon").read()
tokens = Lexer(source).tokenize()
ast = Parser(tokens).parse()
errors = TypeChecker(ast).check()
if errors:
raise Exception("Type errors found")
ir_program = IRGenerator().generate(ast)
# Phase 2: Compile IR to prompts
backend = AnthropicBackend()
compiled = backend.compile_program(ir_program)
# Phase 3: Execute
client = AnthropicClient(api_key="...")
executor = Executor(client=client)
result = await executor.execute(compiled)
if result.success:
for unit in result.unit_results:
for step in unit.step_results:
print(f"{step.step_name}: {step.response.content}")
Next Steps
Semantic Validator
Learn how type contracts are enforced
Retry Engine
See how adaptive retry works
Tracer
Understand execution observability
