Skip to main content

Overview

The ank-replay module provides utilities to feed protocol implementations with historical or exogenous events (e.g., chain logs, price updates) during backtests. Key features:
  • CSV-based event ingestion
  • Protocol routing (target-based dispatch)
  • Timestamp-filtered replay
  • Historical event application to protocol registries

Types

ChainEvent

A single chain/exogenous event delivered to a protocol. The payload schema is protocol-specific and typically decoded inside the protocol’s Protocol::apply_historical implementation.

Fields

ts
Timestamp
Event timestamp (seconds or milliseconds).
market_hint
Option<String>
Optional market-routing hint (e.g., "WETH/USDC:500").
payload
serde_json::Value
Opaque JSON payload (protocol-defined schema).

RoutedCsvRow

Routed event row for CSV I/O, including the protocol target.

Fields

ts
u64
Timestamp.
target
String
Destination protocol id (must match Protocol::id() of a registered instance).
market_hint
Option<String>
Optional market hint for the protocol.
payload_json
String
JSON string payload (e.g., {"kind":"swap_exact_in", ...}).

EventIngestor

Minimal helper to apply a ChainEvent to a protocol.

Methods

apply
fn<P: Protocol>(p: &mut P, ev: ChainEvent) -> Result<()>
Calls Protocol::apply_historical with the event fields.

Functions

load_routed_csv

Load a routed CSV into a vector of (target, ChainEvent) pairs. Signature:
pub fn load_routed_csv(path: &str) -> Result<Vec<(String, ChainEvent)>>
CSV Format: The CSV must have headers: ts,target,market_hint,payload_json
ts
u64
Timestamp (seconds or milliseconds)
target
String
Protocol id (must exist in the registry)
market_hint
String
Optional market identifier (may be empty)
payload_json
String
JSON-encoded payload (protocol-specific schema)

apply_to_registry

Apply a batch of routed events to a protocol registry. Signature:
pub fn apply_to_registry(
    registry: &mut IndexMap<String, Box<dyn Protocol>>,
    events: &[(String, ChainEvent)],
) -> Result<()>
Each (target, ChainEvent) is looked up by target in registry and dispatched to Protocol::apply_historical. Parameters:
registry
&mut IndexMap<String, Box<dyn Protocol>>
Protocol registry (protocol id → protocol instance)
events
&[(String, ChainEvent)]
Events to apply (target, event) pairs

apply_rows_at_ts

Apply only those events whose ts equals at_ts. Signature:
pub fn apply_rows_at_ts(
    registry: &mut IndexMap<String, Box<dyn Protocol>>,
    events: &[(String, ChainEvent)],
    at_ts: u64,
) -> Result<()>
Useful for stepwise replays where you feed the registry per-tick/per-block. Parameters:
at_ts
u64
Timestamp filter (only events at this timestamp will be applied)

dump_routed_csv

Write routed CSV rows to a file (with headers). Signature:
pub fn dump_routed_csv(path: &str, rows: &[RoutedCsvRow]) -> Result<()>

to_routed_csv_rows

Convert (target, ChainEvent) pairs to CSV rows (stringifying payloads). Signature:
pub fn to_routed_csv_rows(events: &[(String, ChainEvent)]) -> Vec<RoutedCsvRow>

Usage Examples

Loading and replaying events

use indexmap::IndexMap;
use ank_protocol::Protocol;
use ank_replay::{load_routed_csv, apply_to_registry};

let mut registry: IndexMap<String, Box<dyn Protocol>> = IndexMap::new();
// ... populate registry with protocol instances ...

// Load events from CSV
let events = load_routed_csv("events.csv")?;

// Apply all events to protocols
apply_to_registry(&mut registry, &events)?;

Tick-by-tick replay

use ank_replay::{load_routed_csv, apply_rows_at_ts};

// Load all events
let events = load_routed_csv("events.csv")?;

// Get unique timestamps
let mut timestamps: Vec<u64> = events.iter()
    .map(|(_, ev)| ev.ts)
    .collect();
timestamps.sort_unstable();
timestamps.dedup();

// Replay tick by tick
for ts in timestamps {
    println!("Processing tick @ {}", ts);
    apply_rows_at_ts(&mut registry, &events, ts)?;
    
    // Process each protocol state after events
    for (id, protocol) in registry.iter_mut() {
        let view = protocol.view_market();
        println!("Protocol {}: {:?}", id, view);
    }
}

Creating an events CSV

Create a CSV file with the following format:
ts,target,market_hint,payload_json
1640000000,uniswap-v3,WETH/USDC:500,"{""kind"":""swap_exact_in"",""zero_for_one"":true,""amount_in_e18"":""1000000000000000000""}"
1640003600,aave-v3,,"{""kind"":""supply"",""asset"":""WETH"",""amount_e18"":""5000000000000000000""}"
1640007200,uniswap-v3,WETH/USDC:500,"{""kind"":""mint"",""tick_lower"":-887220,""tick_upper"":887220,""liquidity"":""1000000000000000000""}"
Tips:
  • Escape inner quotes in payload_json column
  • Leave market_hint empty if not needed
  • Use consistent protocol target ids

Exporting events to CSV

use ank_replay::{to_routed_csv_rows, dump_routed_csv, ChainEvent};
use serde_json::json;

let events = vec![
    (
        "uniswap-v3".to_string(),
        ChainEvent {
            ts: 1640000000,
            market_hint: Some("WETH/USDC:500".into()),
            payload: json!({
                "kind": "swap_exact_in",
                "zero_for_one": true,
                "amount_in_e18": "1000000000000000000"
            }),
        },
    ),
];

// Convert to CSV rows
let rows = to_routed_csv_rows(&events);

// Write to file
dump_routed_csv("output_events.csv", &rows)?;

Advanced: Custom event filtering

use ank_replay::{load_routed_csv, ChainEvent};

// Load events
let all_events = load_routed_csv("events.csv")?;

// Filter by target protocol
let uniswap_events: Vec<_> = all_events.iter()
    .filter(|(target, _)| target == "uniswap-v3")
    .collect();

// Filter by time range
let time_filtered: Vec<_> = all_events.iter()
    .filter(|(_, ev)| ev.ts >= 1640000000 && ev.ts < 1641000000)
    .collect();

// Filter by payload content
let swaps: Vec<_> = all_events.iter()
    .filter(|(_, ev)| {
        ev.payload.get("kind")
            .and_then(|k| k.as_str())
            .map(|k| k == "swap_exact_in")
            .unwrap_or(false)
    })
    .collect();

CSV Format Details

Headers

Required: ts,target,market_hint,payload_json

Field Specifications

ts (required):
  • Type: u64
  • Format: Unix timestamp (seconds or milliseconds)
  • Example: 1640000000
target (required):
  • Type: String
  • Format: Protocol identifier matching Protocol::id()
  • Example: uniswap-v3, aave-v3, compound-v2
market_hint (optional):
  • Type: String or empty
  • Format: Protocol-specific market identifier
  • Examples:
    • Uniswap V3: WETH/USDC:500 (pair + fee tier)
    • Aave: (typically empty or asset symbol)
    • Compound: cETH, cDAI
payload_json (required):
  • Type: JSON string
  • Format: Protocol-specific event payload
  • Must be valid JSON
  • Inner quotes must be escaped when embedding in CSV

Example Payloads

Uniswap V3 Swap:
{
  "kind": "swap_exact_in",
  "zero_for_one": true,
  "amount_in_e18": "1000000000000000000",
  "min_out": "0"
}
Aave V3 Supply:
{
  "kind": "supply",
  "asset": "WETH",
  "amount_e18": "5000000000000000000"
}
Aave V3 Borrow:
{
  "kind": "borrow",
  "asset": "USDC",
  "amount_e18": "10000000000000000000000"
}
Uniswap V3 Mint:
{
  "kind": "mint",
  "tick_lower": -887220,
  "tick_upper": 887220,
  "liquidity": "1000000000000000000"
}

Implementation Notes

Protocol Dispatch

The replay system uses the Protocol trait’s apply_historical method:
trait Protocol {
    fn apply_historical(
        &mut self,
        ts: Timestamp,
        market_hint: Option<String>,
        payload: serde_json::Value,
    ) -> Result<()>;
}
Each protocol implementation is responsible for:
  1. Parsing the payload JSON
  2. Validating the event schema
  3. Applying state changes
  4. Handling errors gracefully

Timestamp Normalization

Timestamps in milliseconds (> 100 billion) are automatically converted to seconds for consistency.

Error Handling

If an event cannot be applied:
  • apply_to_registry returns an error immediately (fail-fast)
  • Protocol implementations should return descriptive errors
  • Use anyhow::Context to add event context

Performance Considerations

  • Events are loaded entirely into memory
  • For large event logs (> 1M events), consider chunked processing
  • Pre-sort events by timestamp for sequential replay
  • Use apply_rows_at_ts for memory-efficient tick-by-tick processing

Build docs developers (and LLMs) love