Skip to main content

Overview

The cache utilities module provides two caching mechanisms:
  1. LRUCache: In-memory thread-safe LRU cache for embeddings and temporary data
  2. ExtractionSidecarCache: Persistent JSON sidecar cache for entity extraction results
Both caches use stable hashing to ensure deterministic keys and support concurrent access from worker threads. Source: src/utils/cache_utils.py, src/utils/extraction_cache.py

LRUCache

Overview

Thread-safe LRU (Least Recently Used) cache backed by an OrderedDict. Evicts the least-recently-used entry when max_items is reached. Thread Safety: All public methods are serialized with a lock, making the cache safe for concurrent access from extraction workers. Source: src/utils/cache_utils.py:13-70

Constructor

from src.utils.cache_utils import LRUCache

cache = LRUCache(max_items=4096)
max_items
int
required
Maximum number of items to store. When exceeded, the least-recently-used item is evicted. Minimum value is 1.

Methods

get

cache.get(key: K) -> Optional[V]
Return cached value or None on miss. Promotes key to most-recently-used on hit.
cache = LRUCache(max_items=1000)
vector = cache.get(("model:1024", "text_hash_abc123"))

if vector is None:
    vector = compute_embedding(text)
    cache.set(("model:1024", "text_hash_abc123"), vector)

set

cache.set(key: K, value: V) -> None
Insert or update key. Evicts LRU entry if over capacity.
cache.set("entity:123", {"name": "Ahmed", "nationality": "Yemeni"})

clear

cache.clear() -> None
Remove all entries and reset hit/miss counters.
cache.clear()
print(cache.stats)  # {'hits': 0, 'misses': 0, 'hit_rate': 0.0, 'size': 0}

stats

cache.stats -> dict
Return hit/miss statistics for diagnostics.
stats
dict
stats = cache.stats
print(f"Cache hit rate: {stats['hit_rate']:.1%}")
print(f"Cache size: {stats['size']}/{cache._max}")

Example: Embedding Cache

from src.utils.cache_utils import LRUCache, sha256_text

class SimpleEmbeddingCache:
    def __init__(self, max_items: int = 4096):
        self._cache = LRUCache(max_items=max_items)
    
    def get_or_compute(self, text: str, model: str) -> List[float]:
        # Build cache key from model fingerprint and text hash
        key = (model, sha256_text(text))
        
        # Try cache first
        cached = self._cache.get(key)
        if cached is not None:
            return cached
        
        # Compute and store
        vector = compute_embedding(text, model)
        self._cache.set(key, vector)
        return vector

# Usage
cache = SimpleEmbeddingCache()
vector = cache.get_or_compute("Abdul Rahman Ahmed", "jinaai/jina-embeddings-v3:1024")

Hashing Utilities

sha256_text

from src.utils.cache_utils import sha256_text

hash_value = sha256_text("Some text")
print(hash_value)  # "7a3d5c8f9e2b1a0d..."
Return the hex SHA-256 digest of a UTF-8 string.
text
str
required
Text to hash.
hash
str
64-character hexadecimal SHA-256 digest.

sha256_jsonable

from src.utils.cache_utils import sha256_jsonable

schema = {"type": "object", "properties": {"name": {"type": "string"}}}
hash_value = sha256_jsonable(schema)
print(hash_value)  # "a1b2c3d4e5f6..."
Return the hex SHA-256 of a JSON-serializable object. Determinism: Keys are sorted before hashing. Non-serializable objects fall back to str(obj) so this never raises.
obj
object
required
JSON-serializable object (dict, list, etc.).
hash
str
64-character hexadecimal SHA-256 digest.
Example: Schema Hashing
from pydantic import BaseModel
from src.utils.cache_utils import sha256_jsonable

class Person(BaseModel):
    name: str
    nationality: str

schema_hash = sha256_jsonable(Person.model_json_schema())
print(f"Schema hash: {schema_hash[:16]}...")  # Use in cache key

ExtractionSidecarCache

Overview

Persistent JSON sidecar cache for entity extraction results. Stores extraction outputs keyed on all output-affecting inputs:
  • Content hash (article text)
  • Model name
  • Entity type
  • Prompt hash
  • Schema hash
  • Temperature
Cache Layout:
{base_dir}/{subdir}/v{version}/{key[0:2]}/{key[2:4]}/{key}.json
Version-Based Invalidation: Bumping cache.extraction.version in the domain config causes reads from the old vN/ directory to stop matching, effectively invalidating the entire cache without deleting files. Source: src/utils/extraction_cache.py

Constructor

from src.utils.extraction_cache import ExtractionSidecarCache

cache = ExtractionSidecarCache(
    base_dir="data/guantanamo",
    subdir="cache/extractions",
    version=1,
    enabled=True
)
base_dir
str
required
Base directory for cache files (typically the domain data directory).
subdir
str
default:"cache/extractions"
Subdirectory within base_dir for extraction cache.
version
int
default:"1"
Cache version. Changing this invalidates all existing cache entries. Typically loaded from cache.extraction.version in domain config.
enabled
bool
default:"True"
Whether caching is enabled. If False, read() always returns None and write() is a no-op.

Methods

make_key

key = cache.make_key(
    text="Article text...",
    system_prompt="Extract all people mentioned...",
    response_model=List[Person],
    model="gemini/gemini-2.0-flash-exp",
    entity_type="people",
    temperature=0.0
)
Build a deterministic hex cache key from all output-affecting inputs.
text
str
required
Article text being extracted from.
system_prompt
str
required
System prompt used for extraction.
response_model
Any
required
Pydantic model or List[Model] defining the extraction schema.
model
str
required
LLM model identifier (e.g., "gemini/gemini-2.0-flash-exp").
entity_type
str
required
Entity type: "people", "organizations", "locations", or "events".
temperature
float
required
Model temperature (affects output randomness).
key
str
64-character hexadecimal cache key.

read

record = cache.read(key)
Return the cached record for key, or None on miss.
key
str
required
Cache key from make_key().
record
Dict[str, Any]
Cached extraction record (see build_cache_record) or None if not found.

write

cache.write(key, record)
Atomically write record as JSON for key. Atomicity: Writes to a temp file in the same directory, then uses os.replace() (atomic on POSIX). Concurrent reads are safe — readers see either the old file or the new one, never a partial write.
key
str
required
Cache key from make_key().
record
Dict[str, Any]
required
Cache record to write (see build_cache_record).

enabled

if cache.enabled:
    print("Cache is active")
Whether caching is enabled.

stats

stats = cache.stats
print(f"Cache hit rate: {stats['hit_rate']:.1%}")
print(f"Cache root: {stats['root']}")
Return cache statistics.
stats
dict

Helper Functions

build_cache_record

from src.utils.extraction_cache import build_cache_record

record = build_cache_record(
    output=[{"name": "Ahmed", "nationality": "Yemeni"}],
    entity_type="people",
    model="gemini/gemini-2.0-flash-exp",
    temperature=0.0,
    content_hash="abc123...",
    prompt_hash="def456...",
    schema_hash="ghi789...",
    cache_version=1
)
Build the JSON-serializable record stored in the cache file.
output
Any
required
Extraction output (typically List[Dict[str, Any]] or list of Pydantic models).
entity_type
str
required
Entity type being extracted.
model
str
required
LLM model identifier.
temperature
float
required
Model temperature.
content_hash
str
required
SHA-256 hash of article text.
prompt_hash
str
required
SHA-256 hash of system prompt.
schema_hash
str
required
SHA-256 hash of response model schema.
cache_version
int
required
Cache version number.
record
Dict[str, Any]
Cache record with metadata:
{
    "cache_version": 1,
    "entity_type": "people",
    "model": "gemini/gemini-2.0-flash-exp",
    "temperature": 0.0,
    "content_hash": "abc123...",
    "prompt_hash": "def456...",
    "schema_hash": "ghi789...",
    "created_at": "2026-03-01T12:00:00.000000+00:00",
    "output": [{"name": "Ahmed", "nationality": "Yemeni"}]
}

Complete Example

from typing import List
from pydantic import BaseModel
from src.utils.extraction_cache import ExtractionSidecarCache, build_cache_record
from src.utils.cache_utils import sha256_text

class Person(BaseModel):
    name: str
    nationality: str

# Initialize cache
cache = ExtractionSidecarCache(
    base_dir="data/guantanamo",
    version=1,
    enabled=True
)

# Extraction inputs
article_text = "Abdul Rahman Ahmed, a Yemeni national, was detained..."
system_prompt = "Extract all people mentioned in the article."
model = "gemini/gemini-2.0-flash-exp"
entity_type = "people"
temperature = 0.0

# Build cache key
key = cache.make_key(
    text=article_text,
    system_prompt=system_prompt,
    response_model=List[Person],
    model=model,
    entity_type=entity_type,
    temperature=temperature
)

# Check cache
cached = cache.read(key)
if cached:
    print(f"Cache hit! Found {len(cached['output'])} entities")
    entities = cached["output"]
else:
    print("Cache miss, calling LLM...")
    # Call LLM (pseudo-code)
    entities = extract_entities_from_llm(article_text, system_prompt, model)
    
    # Build and store cache record
    record = build_cache_record(
        output=entities,
        entity_type=entity_type,
        model=model,
        temperature=temperature,
        content_hash=sha256_text(article_text),
        prompt_hash=sha256_text(system_prompt),
        schema_hash=sha256_text(str(Person.model_json_schema())),
        cache_version=1
    )
    cache.write(key, record)

print(f"Entities: {entities}")
print(f"Cache stats: {cache.stats}")

Configuration

Domain Config

Cache settings are typically loaded from domain YAML:
configs/guantanamo/config.yaml
cache:
  enabled: true
  
  embeddings:
    lru_max_items: 4096  # In-memory LRU cache for embeddings
  
  extraction:
    version: 1  # Bump to invalidate all extraction cache entries
    enabled: true

Accessing Config

from src.config_loader import DomainConfig

dc = DomainConfig("guantanamo")
cache_config = dc.get_cache_config()

print(f"Cache enabled: {cache_config.get('enabled')}")
print(f"Extraction version: {cache_config.get('extraction', {}).get('version')}")
print(f"LRU max items: {cache_config.get('embeddings', {}).get('lru_max_items')}")

Performance Considerations

Embedding Cache:
  • Each embedding vector is ~4KB (1024 dimensions × 4 bytes/float)
  • Default 4096 items ≈ 16MB memory
  • Increase for domains with many unique entity names
  • Decrease for memory-constrained environments
Hit Rate:
  • Monitor cache.stats['hit_rate']
  • Good hit rate: >50% for typical workloads
  • Low hit rate indicates max_items is too small or text variety is very high
Why Shard:
  • Some filesystems struggle with >10K files in a single directory
  • 2-level sharding ({key[:2]}/{key[2:4]}/) limits to ~256 files/dir
Cache Size:
  • Each record is 1-10KB (depends on entity count)
  • 10K cached articles ≈ 10-100MB disk space
  • No automatic eviction — old versions accumulate in v1/, v2/, etc.
LRUCache:
  • All methods use a single lock (threading.Lock)
  • Safe for concurrent access from worker threads
  • Lock contention is minimal for typical workloads
ExtractionSidecarCache:
  • Writes use os.replace() (atomic on POSIX)
  • Concurrent reads of the same key are safe
  • Concurrent writes to the same key are serialized by filesystem
Extraction Cache:
  • Bump cache.extraction.version in domain config
  • Old vN/ directories can be deleted manually after version bump
  • Cache keys include all output-affecting inputs (prompt, schema, model, temperature)
Embedding Cache:
  • Keyed by (fingerprint, text_hash) where fingerprint is "{model}:{dimension}"
  • Changing model or dimension automatically invalidates old entries
  • No manual invalidation needed

See Also

  • Embeddings - Uses LRUCache for embedding vectors
  • Extractors - Uses ExtractionSidecarCache to avoid redundant LLM calls
  • Quality Controls - Validation applied to cached extraction results

Build docs developers (and LLMs) love