Skip to main content
context-bench runs every benchmark as a four-stage pipeline. Each stage transforms data and passes it to the next.

Pipeline overview

Dataset (iterable of dicts)


┌─────────┐     ┌───────────┐     ┌────────┐
│ System   │────▶│ Evaluator │────▶│ Metric │
│ .process │     │ .score    │     │.compute│
└─────────┘     └───────────┘     └────────┘
    │                 │                │
    ▼                 ▼                ▼
  output dict    scores dict     summary dict
1

Dataset yields examples

The dataset is any Iterable[dict]. Each dict must have an "id" key and a "context" key. Built-in loaders cover 42 datasets; you can also pass a plain list of dicts or a local JSONL file.
2

System transforms each example

System.process(example) receives the raw dict and returns a new dict. This is the thing you are benchmarking — a compressor, a proxy, a memory manager, or anything else that touches context.
3

Evaluator scores the transformation

Evaluator.score(original, processed) receives both the input and output dicts and returns a dict[str, float] of named scores such as f1, mc_accuracy, or pass_at_1.
4

Metric aggregates scores

Metric.compute(rows) receives the full list of EvalRow objects for a system and returns summary statistics such as mean score, pass rate, or compression ratio.

Stage details

Dataset

A dataset is any Iterable[dict[str, Any]]. The minimum required keys are:
KeyTypeDescription
idstr | intUnique identifier for the example
contextstrThe context text passed to the system
Optional keys such as question, answer, choices, and dataset are used by specific evaluators.

System

A system implements .name (a string property) and .process(example: dict) -> dict. The returned dict should include a "response" key with the system’s output. The runner passes the full example dict so the system can access any field — context, question, prior turns, etc.

Evaluator

An evaluator implements .name and .score(original, processed) -> dict[str, float]. It receives both the original example (before the system ran) and the processed dict (after). Evaluators are auto-wired based on which datasets you select — you do not configure them manually.

Metric

A metric implements .name and .compute(rows: list[EvalRow]) -> dict[str, float]. It receives all rows for a single system and returns aggregate statistics. Multiple metrics can run on the same rows.

Result data structures

EvalRow

Each (system, example) pair produces one EvalRow:
@dataclass
class EvalRow:
    system: str                   # system name
    example_id: str | int         # from example["id"]
    scores: dict[str, float]      # all evaluator outputs
    input_tokens: int             # tokens in the input dict
    output_tokens: int            # tokens in the output dict
    metadata: dict[str, Any]      # API usage if present
    latency: float                # seconds to process
    dataset: str                  # dataset tag

EvalResult

evaluate() returns an EvalResult that collects all rows and summary statistics:
@dataclass
class EvalResult:
    rows: list[EvalRow]
    summary: dict[str, dict[str, float]]  # system -> metric -> value
    timing: dict[str, float]              # system -> wall-clock seconds
    config: dict[str, Any]               # run parameters
You can filter, export, or convert results:
result.filter(system="kompact")  # EvalResult for one system
result.to_json()                 # JSON string
result.to_dataframe()            # pandas DataFrame (requires pandas)

Data flow in code

The core evaluate() function in runner.py wires all four stages together:
from context_bench import evaluate
from context_bench.evaluators import AnswerQuality
from context_bench.metrics import MeanScore, PassRate, Latency

result = evaluate(
    systems=[my_system],
    dataset=my_dataset,
    evaluators=[AnswerQuality()],
    metrics=[
        MeanScore(score_field="f1"),
        PassRate(score_field="f1"),
        Latency(),
    ],
    max_workers=4,       # concurrent threads
    cache_dir=".cache/", # resume on re-run
)
print(result.summary)
Internally, evaluate() materializes the dataset, iterates over each system, calls _process_example() for every (system, example) pair, collects EvalRow objects, then runs each metric over the rows for that system.

Registry

The Registry is a name-based plugin system for datasets, metrics, and reporters. It lets you register components by name and look them up in config-driven workflows:
from context_bench.registry import registry, register_dataset, load_dataset

# Register a custom dataset loader
registry.register("dataset", "my-data", my_loader_fn)

# Or use the convenience wrapper
register_dataset("my-data", my_loader_fn)

# Load by name (kwargs are forwarded to the loader function)
dataset = load_dataset("my-data", n=100)

# List available names
registry.list("dataset")
The CLI uses the registry to resolve --dataset names. Registering your own loader makes it available as a CLI flag.

Protocol-based design

All four interfaces — System, Evaluator, Metric, and MemorySystem — are typing.Protocol. This means:
  • You implement the required methods on any class you choose.
  • You never subclass a context-bench base class.
  • Structural typing applies: if your object has the right methods, it works.
Duck typing is intentional. context-bench does not use abstract base classes or inheritance. Implement the methods, not a class hierarchy.

Build docs developers (and LLMs) love