Why This Example?
This showcases hypergraph’s key strength: a DAG (retrieval) nested inside a cycle (conversation). Pure DAG frameworks can’t do this — they can’t loop back for follow-up questions. Hypergraph handles it naturally.The Architecture
Implementation
from hypergraph import Graph, node, route, END, AsyncRunner
from anthropic import Anthropic
from openai import OpenAI
import chromadb
openai_client = OpenAI()
anthropic_client = Anthropic()
vector_db = chromadb.Client()
collection = vector_db.get_or_create_collection("docs")
@node(output_name="query_embedding")
async def embed_query(user_input: str, history: list) -> list[float]:
"""
Embed the query with conversation context.
Include recent history for better retrieval.
"""
# Build context-aware query
recent_context = ""
if history:
recent_exchanges = history[-4:] # Last 2 exchanges
recent_context = "\n".join(
f"{msg['role']}: {msg['content']}" for msg in recent_exchanges
)
contextualized_query = f"{recent_context}\n\nCurrent question: {user_input}"
response = openai_client.embeddings.create(
model="text-embedding-3-large",
input=contextualized_query,
)
return response.data[0].embedding
@node(output_name="retrieved_docs")
async def retrieve(query_embedding: list[float], history: list) -> list[str]:
"""
Retrieve relevant documents.
Adjust retrieval based on conversation state.
"""
# More documents for follow-up questions (they're often more specific)
k = 3 if len(history) == 0 else 5
results = collection.query(
query_embeddings=[query_embedding],
n_results=k,
include=["documents"],
)
return results["documents"][0]
@node(output_name="response")
async def generate(
retrieved_docs: list[str],
user_input: str,
history: list,
system_prompt: str = "You are a helpful assistant.",
) -> str:
"""Generate response using retrieved context and conversation history."""
# Build the context block
context = "\n\n---\n\n".join(retrieved_docs)
# Build messages for the LLM
messages = history.copy()
messages.append({"role": "user", "content": user_input})
message = anthropic_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
system=f"{system_prompt}\n\nContext:\n{context}",
messages=messages,
)
return message.content[0].text
# The RAG pipeline as a composable unit
rag_pipeline = Graph(
[embed_query, retrieve, generate],
name="rag",
)
The embedding includes conversation history for context-aware retrieval. This helps handle follow-up questions like “Tell me more about that”.
@node(output_name="history")
def accumulate_history(history: list, user_input: str, response: str) -> list:
"""Update conversation history with the new exchange."""
return history + [
{"role": "user", "content": user_input},
{"role": "assistant", "content": response},
]
@route(targets=["rag", END])
def should_continue(history: list, response: str) -> str:
"""
Decide if the conversation should continue.
In a real system, this might:
- Check for explicit end signals ("goodbye", "thanks, that's all")
- Enforce a maximum turn limit
- Detect conversation completion
"""
# Check for end signals in the last response
end_signals = ["goodbye", "have a great day", "is there anything else"]
if any(signal in response.lower() for signal in end_signals):
return END
# Limit conversation length
if len(history) >= 20: # 10 exchanges
return END
return "rag" # Continue the conversation
# Compose the full conversation system
conversation = Graph(
[
rag_pipeline.as_node(), # DAG nested in cycle
accumulate_history,
should_continue,
],
name="multi_turn_rag",
)
The
@route decorator creates a routing node that directs flow based on the return value. Return END to exit the cycle.async def chat_session():
"""Interactive chat session."""
runner = AsyncRunner()
# Initial state
state = {
"history": [],
"system_prompt": "You are a helpful coding assistant with access to documentation.",
}
print("Chat started. Type 'quit' to exit.\n")
while True:
user_input = input("You: ").strip()
if user_input.lower() == "quit":
break
# Run one turn
result = await runner.run(
conversation,
{**state, "user_input": user_input},
)
print(f"Assistant: {result['response']}\n")
# Update state for next turn
state["history"] = result["history"]
# Check if conversation ended naturally
if len(result["history"]) >= 20:
print("(Conversation ended - maximum length reached)")
break
# Run the chat
import asyncio
asyncio.run(chat_session())
Key Design Decisions
Context-Aware Embedding
The query embedding includes recent conversation history:Adaptive Retrieval
Adjust retrieval strategy based on conversation state:Clean History Management
Theaccumulate_history node handles state updates:
Flexible Termination
The routing logic can terminate based on multiple conditions:Testing
Test the RAG pipeline independently:Extensions
Add Streaming
Add Memory Summarization
For long conversations, summarize older history:Add Citation Tracking
Nodes can return multiple outputs using tuples. Specify them with
output_name=("name1", "name2").What’s Next?
Evaluation Harness
Test this conversation system at scale
Prompt Optimization
Iteratively improve your prompts
RAG Pipeline
The single-turn version of this system
Hierarchical Composition
More nesting patterns