Skip to main content

Runtime Executor

The Executor is the heart of Phase 3: it orchestrates the complete execution of compiled AXON programs.
CompiledProgram → [Executor] → ExecutionResult (with trace, validation, retry)

Architecture Overview

The Executor coordinates six key subsystems:
  1. Model Client — LLM API calls (Anthropic, OpenAI, etc.)
  2. Context Manager — Per-unit state tracking
  3. Semantic Validator — Type contract enforcement
  4. Retry Engine — Adaptive retry with failure context
  5. Memory Backend — Persistent semantic storage
  6. Tracer — Complete execution observability

Implementation

from axon.backends.base_backend import CompiledProgram, CompiledExecutionUnit, CompiledStep
from axon.runtime.semantic_validator import SemanticValidator
from axon.runtime.retry_engine import RetryEngine
from axon.runtime.memory_backend import MemoryBackend, InMemoryBackend
from axon.runtime.tracer import Tracer, TraceEventType
from axon.runtime.context_mgr import ContextManager

class Executor:
    """Orchestrates the execution of compiled AXON programs."""

    def __init__(
        self,
        client: ModelClient,
        *,
        validator: SemanticValidator | None = None,
        retry_engine: RetryEngine | None = None,
        memory: MemoryBackend | None = None,
        tool_dispatcher: ToolDispatcher | None = None,
    ) -> None:
        self._client = client
        self._validator = validator or SemanticValidator()
        self._retry_engine = retry_engine or RetryEngine()
        self._memory = memory or InMemoryBackend()
        self._tool_dispatcher = tool_dispatcher

    async def execute(self, program: CompiledProgram) -> ExecutionResult:
        """Execute a complete compiled AXON program."""
        tracer = Tracer(
            program_name=program.metadata.get("program_name", ""),
            backend_name=program.backend_name,
        )

        unit_results: list[UnitResult] = []
        for unit in program.execution_units:
            unit_result = await self._execute_unit(unit, tracer)
            unit_results.append(unit_result)

        trace = tracer.finalize()
        return ExecutionResult(
            unit_results=tuple(unit_results),
            trace=trace,
            success=all(u.success for u in unit_results),
        )

Model Client Protocol

The Executor is decoupled from LLM APIs via the ModelClient protocol:
from typing import Protocol

class ModelClient(Protocol):
    """Protocol for LLM model interaction."""

    async def call(
        self,
        system_prompt: str,
        user_prompt: str,
        *,
        tools: list[dict[str, Any]] | None = None,
        output_schema: dict[str, Any] | None = None,
        effort: str = "",
        failure_context: str = "",
    ) -> ModelResponse:
        """Send a prompt to the model and return the response."""
        ...
Any implementation (Anthropic, OpenAI, mock) can drive execution:
from anthropic import AsyncAnthropic

class AnthropicClient:
    def __init__(self, api_key: str):
        self._client = AsyncAnthropic(api_key=api_key)
    
    async def call(self, system_prompt, user_prompt, **kwargs) -> ModelResponse:
        response = await self._client.messages.create(
            model="claude-3-5-sonnet-20241022",
            system=system_prompt,
            messages=[{"role": "user", "content": user_prompt}],
            # ...
        )
        return ModelResponse(content=response.content[0].text)

Execution Flow

Unit Execution

async def _execute_unit(
    self,
    unit: CompiledExecutionUnit,
    tracer: Tracer,
) -> UnitResult:
    """Execute a single execution unit (one run statement)."""
    
    # Open a trace span
    tracer.start_span(
        f"unit:{unit.flow_name}",
        metadata={
            "persona": unit.persona_name,
            "context": unit.context_name,
        },
    )

    # Create context manager for this unit
    ctx = ContextManager(
        system_prompt=unit.system_prompt,
        tracer=tracer,
    )

    # Execute each step
    step_results: list[StepResult] = []
    for step in unit.steps:
        step_result = await self._execute_step(
            step=step,
            unit=unit,
            ctx=ctx,
            tracer=tracer,
        )
        step_results.append(step_result)

        # Store result in context for downstream steps
        if step.step_name and step_result.response:
            output = (
                step_result.response.structured
                or step_result.response.content
            )
            ctx.set_step_result(step.step_name, output)

    tracer.end_span()
    return UnitResult(
        flow_name=unit.flow_name,
        step_results=tuple(step_results),
        success=True,
    )

Step Execution Pipeline

async def _execute_step(
    self,
    step: CompiledStep,
    unit: CompiledExecutionUnit,
    ctx: ContextManager,
    tracer: Tracer,
) -> StepResult:
    """Execute a single compiled step.
    
    Pipeline:
    1. Model call
    2. Anchor checking
    3. Semantic validation
    4. Retry on failure (if configured)
    """
    
    tracer.emit(TraceEventType.STEP_START, step_name=step.step_name)

    # Build the step callable with CPS validation
    async def run_step(failure_context: str = "") -> StepResult:
        response = await self._call_model(
            step=step,
            unit=unit,
            ctx=ctx,
            tracer=tracer,
            failure_context=failure_context,
        )

        # CPS chain: anchors → validation → success
        return self._check_anchors_cps(
            response=response,
            unit=unit,
            step_name=step.step_name,
            tracer=tracer,
            on_success=lambda: self._validate_response_cps(
                response=response,
                step=step,
                step_name=step.step_name,
                tracer=tracer,
                on_success=lambda validation: StepResult(
                    step_name=step.step_name,
                    response=response,
                    validation=validation,
                ),
                on_failure=lambda violations: self._raise_validation_error(violations),
            ),
            on_failure=lambda violations: self._raise_anchor_error(violations),
        )

    # Execute with retry
    retry_config = self._extract_refine_config(step)
    retry_result = await self._retry_engine.execute_with_retry(
        fn=run_step,
        config=retry_config,
        tracer=tracer,
        step_name=step.step_name,
    )

    return retry_result.result

Continuation-Passing Style (CPS)

The Executor uses CPS for composable error handling:

Anchor Checking CPS

def _check_anchors_cps(
    self,
    response: ModelResponse,
    unit: CompiledExecutionUnit,
    step_name: str,
    tracer: Tracer,
    on_success: Callable[[], Any],
    on_failure: Callable[[list[str]], Any],
) -> Any:
    """Check anchor constraints using continuation-passing style."""
    
    if not unit.active_anchors:
        return on_success()

    from axon.stdlib.anchors.definitions import ALL_ANCHORS
    anchor_map = {a.ir.name: a for a in ALL_ANCHORS}
    
    all_violations: list[str] = []

    for anchor_data in unit.active_anchors:
        anchor_name = anchor_data.get("name")
        if anchor_name not in anchor_map:
            continue
        
        stdlib_anchor = anchor_map[anchor_name]

        tracer.emit_anchor_check(
            anchor_name=anchor_name,
            step_name=step_name,
        )

        passed, violations = stdlib_anchor.checker_fn(response.content)

        tracer.emit(
            TraceEventType.ANCHOR_PASS if passed
            else TraceEventType.ANCHOR_BREACH,
            step_name=step_name,
            data={"anchor": anchor_name, "passed": passed},
        )
        
        if not passed:
            all_violations.extend(violations)

    if all_violations:
        return on_failure(all_violations)
    
    return on_success()
Why CPS? Enables clean composition of validation layers without nested try/catch.

Validation CPS

def _validate_response_cps(
    self,
    response: ModelResponse,
    step: CompiledStep,
    step_name: str,
    tracer: Tracer,
    on_success: Callable[[ValidationResult], Any],
    on_failure: Callable[[list[str]], Any],
) -> Any:
    """Validate response using continuation-passing style."""
    
    if not step.output_schema:
        return on_success(ValidationResult())

    result = self._validator.validate(
        output=response.structured or response.content,
        expected_type=step.metadata.get("output_type", ""),
        confidence_floor=step.metadata.get("confidence_floor"),
    )

    tracer.emit_validation_result(
        step_name=step_name,
        passed=result.is_valid,
        violations=[v.message for v in result.violations],
    )

    if not result.is_valid:
        return on_failure([v.message for v in result.violations])

    return on_success(result)

Model Call Lifecycle

async def _call_model(
    self,
    step: CompiledStep,
    unit: CompiledExecutionUnit,
    ctx: ContextManager,
    tracer: Tracer,
    failure_context: str = "",
) -> ModelResponse:
    """Make a model call for a step."""
    
    # Build user prompt with context injection
    user_prompt = self._build_user_prompt(step, ctx)

    tracer.emit_model_call(
        step_name=step.step_name,
        prompt_tokens=len(user_prompt),
    )

    try:
        response = await self._client.call(
            system_prompt=unit.system_prompt,
            user_prompt=user_prompt,
            tools=unit.tool_declarations or None,
            output_schema=step.output_schema,
            effort=unit.effort,
            failure_context=failure_context,  # ← Retry context injection
        )
    except Exception as exc:
        raise ModelCallError(
            message=f"Model call failed for step '{step.step_name}': {exc}",
        ) from exc

    tracer.emit(
        TraceEventType.MODEL_RESPONSE,
        step_name=step.step_name,
        data={
            "content_length": len(response.content),
            "has_structured": response.structured is not None,
        },
    )

    # Record in context message history
    ctx.append_message("user", user_prompt)
    ctx.append_message("assistant", response.content)

    return response

Context Management

The ContextManager tracks per-unit state:
class ContextManager:
    """Manages execution context for a single unit."""

    def __init__(self, system_prompt: str, tracer: Tracer):
        self._system_prompt = system_prompt
        self._tracer = tracer
        self._step_results: dict[str, Any] = {}
        self._message_history: list[dict[str, str]] = []

    def set_step_result(self, step_name: str, output: Any) -> None:
        """Store a step's output for downstream references."""
        self._step_results[step_name] = output

    def get_step_result(self, step_name: str) -> Any:
        """Retrieve a prior step's output."""
        return self._step_results.get(step_name)

    def append_message(self, role: str, content: str) -> None:
        """Record a message in the conversation history."""
        self._message_history.append({"role": role, "content": content})

    @property
    def completed_steps(self) -> list[str]:
        """Names of all completed steps."""
        return list(self._step_results.keys())
Usage: Resolves step references like {{Extract.output}} in prompts.

Data Structures

ModelResponse

@dataclass(frozen=True)
class ModelResponse:
    """Normalized response from a model call."""
    content: str = ""
    structured: dict[str, Any] | None = None
    tool_calls: list[dict[str, Any]] = field(default_factory=list)
    confidence: float | None = None
    usage: dict[str, int] = field(default_factory=dict)
    raw: Any = None

StepResult

@dataclass(frozen=True)
class StepResult:
    """Result of executing a single compiled step."""
    step_name: str = ""
    response: ModelResponse | None = None
    validation: ValidationResult | None = None
    retry_info: RetryResult | None = None
    duration_ms: float = 0.0

ExecutionResult

@dataclass(frozen=True)
class ExecutionResult:
    """Result of executing a complete AXON program."""
    unit_results: tuple[UnitResult, ...] = ()
    trace: ExecutionTrace | None = None
    success: bool = True
    duration_ms: float = 0.0

Tool Step Execution

When a step uses a tool, execution routes through the ToolDispatcher:
async def _execute_tool_step(
    self,
    step: CompiledStep,
    ctx: ContextManager,
    tracer: Tracer,
) -> StepResult:
    """Execute a step that uses a tool."""
    
    use_tool_meta = step.metadata["use_tool"]

    if self._tool_dispatcher is None:
        raise AxonRuntimeError(
            message=f"Step '{step.step_name}' requires a tool but "
                    "no ToolDispatcher was provided."
        )

    from axon.compiler.ir_nodes import IRUseTool
    ir_use_tool = IRUseTool(
        tool_name=use_tool_meta.get("tool_name", ""),
        argument=self._build_user_prompt(step, ctx),
    )

    tool_result = await self._tool_dispatcher.dispatch(
        ir_use_tool,
        context={"step_name": step.step_name},
    )

    # Convert ToolResult → ModelResponse
    response = ModelResponse(
        content=json.dumps(tool_result.data) if tool_result.data else "",
        structured=tool_result.data if isinstance(tool_result.data, dict) else None,
    )

    if not tool_result.success:
        raise AxonRuntimeError(
            message=f"Tool '{ir_use_tool.tool_name}' failed: {tool_result.error}"
        )

    return StepResult(
        step_name=step.step_name,
        response=response,
    )

Error Handling

Error Types

class AxonRuntimeError(Exception):
    """Base class for all AXON runtime errors."""
    pass

class ModelCallError(AxonRuntimeError):
    """Raised when an LLM API call fails."""
    pass

class AnchorBreachError(AxonRuntimeError):
    """Raised when an anchor constraint is violated."""
    pass

class ValidationError(AxonRuntimeError):
    """Raised when semantic validation fails."""
    pass

class RefineExhaustedError(AxonRuntimeError):
    """Raised when all retry attempts are exhausted."""
    pass

Error Context

@dataclass
class ErrorContext:
    """Structured context for runtime errors."""
    step_name: str = ""
    flow_name: str = ""
    attempt: int = 0
    expected_type: str = ""
    actual_value: Any = None
    details: str = ""

Usage Example

from axon.compiler import Lexer, Parser, TypeChecker, IRGenerator
from axon.backends.anthropic_backend import AnthropicBackend
from axon.runtime.executor import Executor
from anthropic import AsyncAnthropic

# Phase 1: Compile source to IR
source = open("program.axon").read()
tokens = Lexer(source).tokenize()
ast = Parser(tokens).parse()
errors = TypeChecker(ast).check()
if errors:
    raise Exception("Type errors found")
ir_program = IRGenerator().generate(ast)

# Phase 2: Compile IR to prompts
backend = AnthropicBackend()
compiled = backend.compile_program(ir_program)

# Phase 3: Execute
client = AnthropicClient(api_key="...")
executor = Executor(client=client)
result = await executor.execute(compiled)

if result.success:
    for unit in result.unit_results:
        for step in unit.step_results:
            print(f"{step.step_name}: {step.response.content}")

Next Steps

Semantic Validator

Learn how type contracts are enforced

Retry Engine

See how adaptive retry works

Tracer

Understand execution observability

Build docs developers (and LLMs) love