Skip to main content

Retry Engine

The RetryEngine provides adaptive retry with failure context injection for refine blocks in AXON programs.
Step Execution → [Retry Engine] → Success | Retry with Context | Exhaustion

Key Features

  1. Failure Context Injection — Pass previous error to next attempt
  2. Configurable Backoff — None, linear, or exponential delays
  3. Attempt Tracking — Full record of each retry
  4. Exhaustion Handling — Configurable behavior when all attempts fail
  5. Trace Integration — Every retry is logged

Configuration

RefineConfig

from dataclasses import dataclass

@dataclass(frozen=True)
class RefineConfig:
    """Configuration for a retry/refine block."""
    max_attempts: int = 3
    pass_failure_context: bool = True
    backoff: str = "none"  # none | linear | exponential
    on_exhaustion: str = ""  # "" (raise) | "skip" | "fallback"
    on_exhaustion_target: str = ""
Maps from IRRefine:
refine {
  max_attempts: 3
  pass_failure_context: true
  backoff: exponential
  on_exhaustion: raise ValidationError
}

Implementation

from typing import Awaitable, Callable, Any
import asyncio

class RetryEngine:
    """Wraps step execution with configurable retry and refine logic."""

    async def execute_with_retry(
        self,
        fn: Callable[..., Awaitable[Any]],
        config: RefineConfig | None = None,
        tracer: Tracer | None = None,
        step_name: str = "",
        flow_name: str = "",
    ) -> RetryResult:
        """Execute a callable with retry logic."""
        effective_config = config or RefineConfig(max_attempts=1)
        attempts: list[AttemptRecord] = []
        last_error: str = ""

        if tracer and effective_config.max_attempts > 1:
            tracer.emit(
                TraceEventType.REFINE_START,
                step_name=step_name,
                data={
                    "max_attempts": effective_config.max_attempts,
                    "backoff": effective_config.backoff,
                },
            )

        for attempt_num in range(1, effective_config.max_attempts + 1):
            try:
                # Build kwargs for the callable
                kwargs: dict[str, Any] = {}
                if (
                    attempt_num > 1
                    and effective_config.pass_failure_context
                    and last_error
                ):
                    kwargs["failure_context"] = last_error

                result = await fn(**kwargs)

                # Success!
                record = AttemptRecord(
                    attempt=attempt_num,
                    success=True,
                    result=result,
                )
                attempts.append(record)

                return RetryResult(
                    success=True,
                    result=result,
                    attempts=tuple(attempts),
                )

            except Exception as exc:
                last_error = str(exc)
                error_type = type(exc).__name__

                record = AttemptRecord(
                    attempt=attempt_num,
                    success=False,
                    error=last_error,
                    error_type=error_type,
                )
                attempts.append(record)

                if tracer:
                    tracer.emit_retry_attempt(
                        step_name=step_name,
                        attempt=attempt_num,
                        reason=last_error,
                        data={"error_type": error_type},
                    )

                # Apply backoff before next attempt
                if attempt_num < effective_config.max_attempts:
                    delay = self._compute_delay(
                        attempt_num, effective_config.backoff
                    )
                    if delay > 0:
                        await asyncio.sleep(delay)

        # All attempts exhausted
        exhausted_result = RetryResult(
            success=False,
            attempts=tuple(attempts),
            exhausted=True,
        )

        # Handle exhaustion action
        if effective_config.on_exhaustion == "skip":
            return exhausted_result

        # Default: raise RefineExhaustedError
        raise RefineExhaustedError(
            message=(
                f"All {effective_config.max_attempts} refine attempts "
                f"exhausted for step '{step_name}'."
            )
        )

Backoff Strategies

None (Immediate)

backoff: "none"
# Delay: 0s between attempts
Use case: Fast retries for transient errors (rate limits, network blips).

Linear

backoff: "linear"
# Attempt 1: 0s
# Attempt 2: 1s delay
# Attempt 3: 2s delay
# Attempt 4: 3s delay
Formula:
delay = LINEAR_BASE_DELAY_S * attempt  # 1.0 * attempt
Use case: Moderate spacing for validation failures.

Exponential

backoff: "exponential"
# Attempt 1: 0s
# Attempt 2: 1s delay   (0.5 * 2^1)
# Attempt 3: 2s delay   (0.5 * 2^2)
# Attempt 4: 4s delay   (0.5 * 2^3)
# Attempt 5: 8s delay   (0.5 * 2^4)
Formula:
delay = EXPONENTIAL_BASE_DELAY_S * (EXPONENTIAL_MULTIPLIER ** attempt)
delay = 0.5 * (2.0 ** attempt)
Max cap: 30 seconds Use case: Severe failures requiring model “cool-down” time.

Implementation

@staticmethod
def _compute_delay(attempt: int, strategy: str) -> float:
    """Compute the backoff delay for a given attempt number."""
    if strategy == "none":
        return 0.0

    if strategy == "linear":
        delay = LINEAR_BASE_DELAY_S * attempt
        return min(delay, MAX_DELAY_S)

    if strategy == "exponential":
        delay = EXPONENTIAL_BASE_DELAY_S * (EXPONENTIAL_MULTIPLIER ** attempt)
        return min(delay, MAX_DELAY_S)

    return 0.0

Failure Context Injection

How It Works

When pass_failure_context: true, the previous error is injected into the next attempt’s prompt:
# Attempt 1
response = await fn()  # No failure_context

# Validation fails: "Confidence 0.72 is below floor 0.85"

# Attempt 2
response = await fn(failure_context="Confidence 0.72 is below floor 0.85")

Model Client Integration

The ModelClient.call() method receives the failure context:
response = await self._client.call(
    system_prompt=unit.system_prompt,
    user_prompt=user_prompt,
    failure_context=failure_context,  # ← Injected here
)

Backend Handling

Backends append failure context to the user prompt:
# Anthropic example
if failure_context:
    user_prompt += (
        f"\n\n[PREVIOUS ATTEMPT FAILED]\n"
        f"Reason: {failure_context}\n"
        f"Please correct these issues in your response."
    )
Result: The model sees why it failed and can self-correct.

Attempt Records

AttemptRecord

@dataclass(frozen=True)
class AttemptRecord:
    """Record of a single execution attempt."""
    attempt: int       # 1-based attempt number
    success: bool      # Whether this attempt succeeded
    result: Any = None # The result (if successful)
    error: str = ""    # Error message (if failed)
    error_type: str = "" # Error class name (if failed)

RetryResult

@dataclass(frozen=True)
class RetryResult:
    """Aggregate result of a retry sequence."""
    success: bool
    result: Any = None
    attempts: tuple[AttemptRecord, ...] = ()
    exhausted: bool = False
Usage:
retry_result = await retry_engine.execute_with_retry(
    fn=execute_step,
    config=RefineConfig(max_attempts=3, backoff="exponential"),
)

if retry_result.success:
    print(f"Succeeded on attempt {len(retry_result.attempts)}")
else:
    print(f"Exhausted after {len(retry_result.attempts)} attempts")
    for attempt in retry_result.attempts:
        print(f"  Attempt {attempt.attempt}: {attempt.error}")

Example: Self-Healing Validation

Scenario

A step requires structured output with specific fields, but the model omits one.

Attempt 1

Model Output:
{
  "parties": "Acme Corp, Beta LLC",
  "effective_date": "2024-01-15"
}
Validation: ❌ Missing field termination_clause

Attempt 2 (with failure context)

Injected Context:
[PREVIOUS ATTEMPT FAILED]
Reason: Missing required fields: ['termination_clause']. Present fields: ['parties', 'effective_date'].
Please correct these issues in your response.
Model Output:
{
  "parties": "Acme Corp, Beta LLC",
  "effective_date": "2024-01-15",
  "termination_clause": "Either party may terminate with 30 days notice"
}
Validation: ✅ Success Result: Self-healing without human intervention.

Exhaustion Handling

on_exhaustion: "" (Default - Raise)

on_exhaustion: ""
Raises RefineExhaustedError when all attempts fail:
try:
    result = await retry_engine.execute_with_retry(...)
except RefineExhaustedError as exc:
    print(f"All attempts exhausted: {exc.message}")

on_exhaustion: “skip”

on_exhaustion: "skip"
Returns RetryResult(success=False, exhausted=True) without raising:
result = await retry_engine.execute_with_retry(...)
if result.exhausted:
    print("Step skipped after exhaustion")
Use case: Non-critical steps that can be skipped.

on_exhaustion: “fallback”

on_exhaustion: fallback("default_response")
Status: Planned for Phase 3 expansion. Planned behavior: Execute a fallback flow or return a default value.

Trace Integration

Every retry is fully traced:

Refine Start

tracer.emit(
    TraceEventType.REFINE_START,
    step_name="Extract",
    data={
        "max_attempts": 3,
        "backoff": "exponential",
    },
)

Retry Attempts

tracer.emit_retry_attempt(
    step_name="Extract",
    attempt=2,
    reason="Missing required fields: ['termination_clause']",
    data={"error_type": "ValidationError"},
)

Example Trace

{
  "event_type": "refine_start",
  "step_name": "Extract",
  "data": {"max_attempts": 3, "backoff": "exponential"}
},
{
  "event_type": "retry_attempt",
  "step_name": "Extract",
  "data": {
    "attempt": 2,
    "reason": "Confidence 0.72 is below floor 0.85",
    "error_type": "ValidationError"
  }
},
{
  "event_type": "step_end",
  "step_name": "Extract",
  "data": {"success": true, "total_attempts": 2}
}

Configuration Examples

Conservative (Fast Retry)

refine {
  max_attempts: 2
  pass_failure_context: true
  backoff: none
  on_exhaustion: raise ValidationError
}
Use case: Quick fixes for simple validation failures.

Aggressive (Deep Healing)

refine {
  max_attempts: 5
  pass_failure_context: true
  backoff: exponential
  on_exhaustion: raise RefineExhaustedError
}
Use case: Complex validation with multiple potential issues.

Graceful Degradation

refine {
  max_attempts: 3
  pass_failure_context: true
  backoff: linear
  on_exhaustion: skip
}
Use case: Non-critical steps that can be omitted on failure.

Integration with Executor

The Executor always routes through the RetryEngine:
async def _execute_step(...) -> StepResult:
    # Build the step callable
    async def run_step(failure_context: str = "") -> StepResult:
        response = await self._call_model(..., failure_context=failure_context)
        # CPS validation chain...
        return StepResult(...)

    # Extract refine config from step metadata
    refine_config = self._extract_refine_config(step)

    # Execute with retry (even if max_attempts=1)
    retry_result = await self._retry_engine.execute_with_retry(
        fn=run_step,
        config=refine_config or RefineConfig(max_attempts=1),
        tracer=tracer,
        step_name=step.step_name,
    )

    return retry_result.result
Key Insight: Even steps without explicit refine blocks go through the engine (with max_attempts=1).

Error Types Handled

The RetryEngine catches all exceptions and treats them as retry-able:
  • ValidationError — Semantic validation failure
  • AnchorBreachError — Anchor constraint violation
  • ModelCallError — LLM API failure
  • ConfidenceError — Confidence below threshold
  • Any Exception — Generic error
Behavior: Record error, inject context, retry.

Next Steps

Semantic Validator

See what triggers validation failures

Tracer

Understand retry observability

Executor

Review the full execution pipeline

Build docs developers (and LLMs) love