Skip to main content

Execution Tracer

The Tracer produces a structured, semantic log of every decision the AXON runtime makes.
Execution Events → [Tracer] → ExecutionTrace (structured semantic log)

Why Tracing?

Traditional logs answer what happened. AXON traces answer why:
  • Which anchor activated?
  • Which reasoning path was taken?
  • Which retry was triggered?
  • What did the validator decide?
  • How long did each step take?
Key Principle: Traces are semantic, not mechanical. They track cognitive decisions, not CPU cycles.

Architecture

Four Core Types

TraceEvent        # A single atomic observation
TraceSpan         # A named scope containing events and sub-spans
ExecutionTrace    # The root container for a full program execution
Tracer            # The recorder: start_span(), emit(), end_span()

Trace Hierarchy

ExecutionTrace
├── Span: unit:AnalyzeContract
│   ├── Event: step_start (Extract)
│   ├── Event: model_call (Extract)
│   ├── Event: model_response (Extract)
│   ├── Event: anchor_check (NoHallucination)
│   ├── Event: anchor_pass (NoHallucination)
│   ├── Event: validation_pass (Extract)
│   ├── Event: step_end (Extract)
│   ├── Event: step_start (Reason)
│   └── ...
└── metadata: {backend: "anthropic", duration_ms: 12453}

Event Types

The Tracer emits 14 semantic event types:
class TraceEventType(str, Enum):
    # Step lifecycle
    STEP_START = "step_start"
    STEP_END = "step_end"

    # Model interaction
    MODEL_CALL = "model_call"
    MODEL_RESPONSE = "model_response"

    # Anchor enforcement
    ANCHOR_CHECK = "anchor_check"
    ANCHOR_PASS = "anchor_pass"
    ANCHOR_BREACH = "anchor_breach"

    # Semantic validation
    VALIDATION_PASS = "validation_pass"
    VALIDATION_FAIL = "validation_fail"

    # Retry / refine
    RETRY_ATTEMPT = "retry_attempt"
    REFINE_START = "refine_start"

    # Memory operations
    MEMORY_READ = "memory_read"
    MEMORY_WRITE = "memory_write"

    # Confidence
    CONFIDENCE_CHECK = "confidence_check"

Core Data Structures

TraceEvent

@dataclass(frozen=True)
class TraceEvent:
    """A single atomic event in the execution trace."""
    event_type: TraceEventType
    timestamp: float            # Unix timestamp (seconds)
    step_name: str = ""
    data: dict[str, Any] = field(default_factory=dict)
    duration_ms: float = 0.0
Example:
TraceEvent(
    event_type=TraceEventType.ANCHOR_CHECK,
    timestamp=1704067200.123,
    step_name="Extract",
    data={"anchor": "NoHallucination", "instruction": "cite all sources"},
)

TraceSpan

@dataclass
class TraceSpan:
    """A named scope containing child events and nested sub-spans."""
    name: str
    start_time: float = 0.0
    end_time: float = 0.0
    events: list[TraceEvent] = field(default_factory=list)
    children: list[TraceSpan] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

    @property
    def duration_ms(self) -> float:
        if self.end_time <= 0:
            return 0.0
        return round((self.end_time - self.start_time) * 1000, 2)
Example:
TraceSpan(
    name="unit:AnalyzeContract",
    start_time=1704067200.0,
    end_time=1704067212.5,
    events=[...],  # 15 events
    children=[],
    metadata={"persona": "LegalExpert", "context": "LegalReview"},
)

ExecutionTrace

@dataclass
class ExecutionTrace:
    """Root container for a complete program execution trace."""
    program_name: str = ""
    backend_name: str = ""
    start_time: float = 0.0
    end_time: float = 0.0
    spans: list[TraceSpan] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

    @property
    def duration_ms(self) -> float:
        if self.end_time <= 0:
            return 0.0
        return round((self.end_time - self.start_time) * 1000, 2)

    @property
    def total_events(self) -> int:
        return self._count_events(self.spans)

The Tracer

Initialization

class Tracer:
    """Records semantic execution events into a structured trace."""

    def __init__(
        self,
        program_name: str = "",
        backend_name: str = "",
    ) -> None:
        self._trace = ExecutionTrace(
            program_name=program_name,
            backend_name=backend_name,
            start_time=time.time(),
        )
        self._span_stack: list[TraceSpan] = []

Span Management

def start_span(
    self,
    name: str,
    metadata: dict[str, Any] | None = None,
) -> TraceSpan:
    """Open a new span as a child of the current innermost span."""
    span = TraceSpan(
        name=name,
        start_time=time.time(),
        metadata=metadata or {},
    )

    if self._span_stack:
        self._span_stack[-1].children.append(span)
    else:
        self._trace.spans.append(span)

    self._span_stack.append(span)
    return span

def end_span(self, metadata: dict[str, Any] | None = None) -> TraceSpan | None:
    """Close the current innermost span."""
    if not self._span_stack:
        return None

    span = self._span_stack.pop()
    span.end_time = time.time()

    if metadata:
        span.metadata.update(metadata)

    return span
Usage:
tracer.start_span("unit:AnalyzeContract", metadata={"persona": "LegalExpert"})
tracer.emit(TraceEventType.STEP_START, step_name="Extract")
# ... execution ...
tracer.end_span(metadata={"duration_ms": 1234})

Event Emission

def emit(
    self,
    event_type: TraceEventType,
    step_name: str = "",
    data: dict[str, Any] | None = None,
    duration_ms: float = 0.0,
) -> TraceEvent:
    """Record a semantic event in the current span."""
    event = TraceEvent(
        event_type=event_type,
        timestamp=time.time(),
        step_name=step_name,
        data=data or {},
        duration_ms=duration_ms,
    )

    if self._span_stack:
        self._span_stack[-1].events.append(event)

    return event
Usage:
tracer.emit(
    TraceEventType.ANCHOR_BREACH,
    step_name="Extract",
    data={
        "anchor": "NoHallucination",
        "violations": ["No sources cited"],
    },
)

Convenience Emitters

Model Call

def emit_model_call(
    self,
    step_name: str,
    prompt_tokens: int = 0,
    data: dict[str, Any] | None = None,
) -> TraceEvent:
    payload = data or {}
    if prompt_tokens > 0:
        payload["prompt_tokens"] = prompt_tokens
    return self.emit(TraceEventType.MODEL_CALL, step_name=step_name, data=payload)

Anchor Check

def emit_anchor_check(
    self,
    anchor_name: str,
    step_name: str = "",
    passed: bool = True,
    data: dict[str, Any] | None = None,
) -> TraceEvent:
    payload = data or {}
    payload["anchor_name"] = anchor_name

    self.emit(TraceEventType.ANCHOR_CHECK, step_name=step_name, data=payload)

    result_type = TraceEventType.ANCHOR_PASS if passed else TraceEventType.ANCHOR_BREACH
    return self.emit(result_type, step_name=step_name, data=payload)

Validation Result

def emit_validation_result(
    self,
    step_name: str,
    passed: bool,
    expected_type: str = "",
    violations: list[str] | None = None,
    data: dict[str, Any] | None = None,
) -> TraceEvent:
    payload = data or {}
    if expected_type:
        payload["expected_type"] = expected_type
    if violations:
        payload["violations"] = violations

    event_type = (
        TraceEventType.VALIDATION_PASS if passed else TraceEventType.VALIDATION_FAIL
    )
    return self.emit(event_type, step_name=step_name, data=payload)

Retry Attempt

def emit_retry_attempt(
    self,
    step_name: str,
    attempt: int,
    reason: str = "",
    data: dict[str, Any] | None = None,
) -> TraceEvent:
    payload = data or {}
    payload["attempt"] = attempt
    if reason:
        payload["reason"] = reason
    return self.emit(
        TraceEventType.RETRY_ATTEMPT, step_name=step_name, data=payload
    )

Finalization

def finalize(self) -> ExecutionTrace:
    """Close all remaining spans and return the complete trace."""
    while self._span_stack:
        self.end_span()

    self._trace.end_time = time.time()
    return self._trace
Usage:
tracer = Tracer(program_name="contract_analysis", backend_name="anthropic")
# ... execution ...
trace = tracer.finalize()

print(f"Total duration: {trace.duration_ms}ms")
print(f"Total events: {trace.total_events}")

JSON Serialization

The entire trace is fully JSON-serializable:
trace = tracer.finalize()
trace_dict = trace.to_dict()

import json
with open("execution_trace.json", "w") as f:
    json.dump(trace_dict, f, indent=2)
Example Output:
{
  "program_name": "contract_analysis",
  "backend_name": "anthropic",
  "start_time": 1704067200.0,
  "duration_ms": 12453.5,
  "total_events": 28,
  "spans": [
    {
      "name": "unit:AnalyzeContract",
      "start_time": 1704067200.0,
      "duration_ms": 12453.5,
      "events": [
        {
          "event_type": "step_start",
          "timestamp": 1704067200.1,
          "step_name": "Extract"
        },
        {
          "event_type": "model_call",
          "timestamp": 1704067200.2,
          "step_name": "Extract",
          "data": {"prompt_tokens": 1247}
        },
        {
          "event_type": "anchor_check",
          "timestamp": 1704067202.8,
          "step_name": "Extract",
          "data": {"anchor": "NoHallucination"}
        },
        {
          "event_type": "anchor_pass",
          "timestamp": 1704067202.9,
          "step_name": "Extract"
        }
      ],
      "metadata": {
        "persona": "LegalExpert",
        "context": "LegalReview"
      }
    }
  ]
}

Integration Points

Executor

async def execute(self, program: CompiledProgram) -> ExecutionResult:
    tracer = Tracer(
        program_name=program.metadata.get("program_name", ""),
        backend_name=program.backend_name,
    )

    for unit in program.execution_units:
        unit_result = await self._execute_unit(unit, tracer)
        # ...

    trace = tracer.finalize()
    return ExecutionResult(trace=trace, ...)

Validator

if tracer:
    tracer.emit_validation_result(
        step_name=step_name,
        passed=result.is_valid,
        violations=[v.message for v in result.violations],
    )

RetryEngine

if tracer:
    tracer.emit_retry_attempt(
        step_name=step_name,
        attempt=attempt_num,
        reason=last_error,
    )

Memory

if self._tracer:
    self._tracer.emit(
        TraceEventType.MEMORY_WRITE,
        data={"key": key, "value_type": type(value).__name__},
    )

Example: Full Trace

AXON Program

flow AnalyzeContract(doc: Document) -> ContractAnalysis {
  step Extract {
    given: doc
    probe doc for [parties, dates]
    output: EntityMap
  }
  
  reason about Risks {
    given: Extract.output
    depth: 2
    show_work: true
  }
}

run AnalyzeContract(contract.pdf)
  as LegalExpert
  constrained_by [NoHallucination]

Resulting Trace (Simplified)

{
  "program_name": "contract_analysis",
  "backend_name": "anthropic",
  "duration_ms": 8742.3,
  "total_events": 18,
  "spans": [
    {
      "name": "unit:AnalyzeContract",
      "duration_ms": 8742.3,
      "events": [
        {"event_type": "step_start", "step_name": "Extract"},
        {"event_type": "model_call", "step_name": "Extract", "data": {"prompt_tokens": 1247}},
        {"event_type": "model_response", "step_name": "Extract", "duration_ms": 2341.5},
        {"event_type": "anchor_check", "step_name": "Extract", "data": {"anchor": "NoHallucination"}},
        {"event_type": "anchor_pass", "step_name": "Extract"},
        {"event_type": "validation_pass", "step_name": "Extract"},
        {"event_type": "step_end", "step_name": "Extract", "duration_ms": 2456.8},
        {"event_type": "step_start", "step_name": "Reason"},
        {"event_type": "model_call", "step_name": "Reason", "data": {"depth": 2, "show_work": true}},
        {"event_type": "model_response", "step_name": "Reason", "duration_ms": 5823.7},
        {"event_type": "anchor_check", "step_name": "Reason"},
        {"event_type": "anchor_pass", "step_name": "Reason"},
        {"event_type": "validation_pass", "step_name": "Reason"},
        {"event_type": "step_end", "step_name": "Reason", "duration_ms": 6124.2}
      ],
      "metadata": {
        "persona": "LegalExpert",
        "effort": "high"
      }
    }
  ]
}

Observability

Metrics Extraction

trace = result.trace

# Total execution time
print(f"Duration: {trace.duration_ms}ms")

# Event counts by type
from collections import Counter
event_types = []
for span in trace.spans:
    event_types.extend(e.event_type.value for e in span.events)
counts = Counter(event_types)
print(f"Anchor checks: {counts['anchor_check']}")
print(f"Retries: {counts['retry_attempt']}")

# Anchor breach rate
total_checks = counts['anchor_check']
breaches = counts['anchor_breach']
if total_checks > 0:
    breach_rate = breaches / total_checks * 100
    print(f"Anchor breach rate: {breach_rate:.1f}%")

Debugging Failures

# Find all validation failures
for span in trace.spans:
    for event in span.events:
        if event.event_type == TraceEventType.VALIDATION_FAIL:
            print(f"Step: {event.step_name}")
            print(f"Violations: {event.data['violations']}")

Next Steps

Executor

See how tracing integrates into execution

Memory Backend

Learn about semantic storage tracing

Retry Engine

Understand retry attempt tracking

Build docs developers (and LLMs) love