Skip redundant computation by caching node results. Same inputs produce the same outputs — hypergraph can remember that.
When to Use
- Expensive computations you call repeatedly with the same inputs (embeddings, LLM calls)
- Development iteration where you re-run a graph but only change downstream nodes
- Batch processing where many items share common intermediate results
Basic Pattern
Mark a node with cache=True and pass a cache backend to the runner:
from hypergraph import Graph, node, SyncRunner, InMemoryCache
@node(output_name="embedding", cache=True)
def embed(text: str) -> list[float]:
# Expensive API call — only runs once per unique input
return model.embed(text)
@node(output_name="answer")
def generate(embedding: list[float], query: str) -> str:
return llm.generate(embedding, query)
graph = Graph(nodes=[embed, generate])
runner = SyncRunner(cache=InMemoryCache())
# First call — embed executes normally
result = runner.run(graph, {"text": "hello", "query": "What is this?"})
# Second call with same text — embed served from cache
result = runner.run(graph, {"text": "hello", "query": "Different question"})
Node opt-in
@node(..., cache=True) on the nodes you want cached
Runner backend
SyncRunner(cache=InMemoryCache()) or AsyncRunner(cache=...)
Cache Backends
InMemoryCache
Fast, ephemeral. Lives for the duration of the process.
from hypergraph import InMemoryCache
# Unlimited size
cache = InMemoryCache()
# LRU eviction after 1000 entries
cache = InMemoryCache(max_size=1000)
DiskCache
Persistent across runs. Requires the optional diskcache dependency:
pip install 'hypergraph[cache]'
from hypergraph import DiskCache
# Persists to ~/.cache/hypergraph (default)
cache = DiskCache()
# Custom directory
cache = DiskCache(cache_dir="/tmp/my-project-cache")
runner = SyncRunner(cache=cache)
# Results survive process restarts
result = runner.run(graph, {"text": "hello", "query": "Q1"})
# ... restart process ...
# embed is still cached from the previous run
Integrity Verification
DiskCache stores serialized bytes plus an HMAC-SHA256 signature:
- On write: value is serialized, signed, and stored with its signature
- On read: signature is verified before deserialization
This prevents deserializing tampered cache payloads. If an entry is corrupted, missing a signature, has invalid metadata, or fails deserialization, Hypergraph evicts it and treats it as a cache miss.
Cache integrity is verified before deserialization to prevent security issues. Corrupted entries are automatically evicted.
Custom Backend
Implement the CacheBackend protocol for Redis, databases, or anything else:
from hypergraph import CacheBackend
class RedisCache(CacheBackend):
def get(self, key: str) -> tuple[bool, object]:
value = redis.get(key)
if value is None:
return False, None
return True, pickle.loads(value)
def set(self, key: str, value: object) -> None:
redis.set(key, pickle.dumps(value))
How Cache Keys Work
Cache keys are computed from:
- Node identity — a hash of the function’s source code (
definition_hash)
- Input values — a deterministic hash of all inputs passed to the node
If you change the function body, the cache automatically invalidates. If inputs aren’t picklable, the node falls back to uncached execution (with a warning).
# First run
embed("hello") # Executes, caches result
# Second run with same input
embed("hello") # Cache hit, skips execution
Observing Cache Hits
Cache events integrate with the event system:
from hypergraph import TypedEventProcessor, CacheHitEvent, NodeEndEvent
class CacheMonitor(TypedEventProcessor):
def __init__(self):
self.hits = 0
self.misses = 0
def on_cache_hit(self, event: CacheHitEvent) -> None:
self.hits += 1
print(f"Cache hit: {event.node_name}")
def on_node_end(self, event: NodeEndEvent) -> None:
if not event.cached:
self.misses += 1
monitor = CacheMonitor()
result = runner.run(graph, inputs, event_processors=[monitor])
print(f"Hits: {monitor.hits}, Misses: {monitor.misses}")
The event sequence for a cache hit is:
NodeStartEvent(node_name="embed")
CacheHitEvent(node_name="embed", cache_key="abc123...")
NodeEndEvent(node_name="embed", cached=True, duration_ms=0.0)
Caching Route and IfElse Nodes
Gate nodes (@route, @ifelse) are cacheable. The routing function’s return value is cached, and the runner restores the routing decision on cache hit:
@route(targets=["fast_path", "full_rag", END], cache=True)
def classify_query(query: str) -> str:
"""Expensive classification — cache the decision."""
category = llm.classify(query)
if category == "faq":
return "fast_path"
elif category == "complex":
return "full_rag"
return END
On cache hit, the runner replays the cached routing decision without calling the function again. Downstream routing still works correctly — the cached decision is restored into the graph state.
Restrictions
GraphNode
Nested graphs reject cache=True at build time. Cache individual nodes inside them instead.
InterruptNode
InterruptNode supports cache=True (defaults to False). When cached, a previously auto-resolved response is replayed without re-running the handler.
@interrupt(output_name="decision", cache=True)
def approval(draft: str) -> str:
# Expensive LLM call to auto-approve
return llm_reviewer.approve(draft)
Real-World Example: Cached RAG Pipeline
from hypergraph import Graph, node, SyncRunner, InMemoryCache
@node(output_name="embedding", cache=True)
def embed(text: str) -> list[float]:
"""Embedding API call — $0.0001 per call."""
return openai.embeddings.create(input=text, model="text-embedding-3-small")
@node(output_name="docs", cache=True)
def retrieve(embedding: list[float], top_k: int = 5) -> list[str]:
"""Vector DB search — 50ms per query."""
return pinecone_index.query(embedding, top_k=top_k)
@node(output_name="answer")
def generate(docs: list[str], query: str) -> str:
"""LLM generation — not cached (we want fresh answers)."""
return llm.chat(docs=docs, query=query)
graph = Graph(nodes=[embed, retrieve, generate])
runner = SyncRunner(cache=InMemoryCache(max_size=500))
# During development: re-run with different prompts
# embed and retrieve are cached — only generate re-executes
for query in ["What is RAG?", "How does retrieval work?", "What is RAG?"]:
result = runner.run(graph, {"text": "RAG tutorial", "query": query})
# Third query hits cache for both embed AND retrieve
Cache expensive operations (embeddings, retrieval) but not final generation if you want fresh responses.
What’s Next?