Skip to main content
A conversational RAG system where users can ask follow-up questions. The system retrieves new context based on the evolving conversation.

Why This Example?

This showcases hypergraph’s key strength: a DAG (retrieval) nested inside a cycle (conversation). Pure DAG frameworks can’t do this — they can’t loop back for follow-up questions. Hypergraph handles it naturally.

The Architecture

┌────────────────────────────────────────────────────────────┐
│                   CONVERSATION LOOP                        │
│                                                            │
│  user_input → RAG_PIPELINE → accumulate → should_continue  │
│       ↑           │                            │           │
│       │           ▼                            │           │
│       │    ┌─────────────┐                     │           │
│       │    │ embed       │                     │           │
│       │    │     ↓       │                     │           │
│       │    │ retrieve    │ (DAG)               │           │
│       │    │     ↓       │                     │           │
│       │    │ generate    │                     │           │
│       │    └─────────────┘                     │           │
│       │                                        │           │
│       └────────────────────────────────────────┘           │
│                                        │                   │
│                                        ▼                   │
│                                       END                  │
└────────────────────────────────────────────────────────────┘

Implementation

1
Define the RAG pipeline (DAG)
2
This runs once per conversation turn:
3
from hypergraph import Graph, node, route, END, AsyncRunner
from anthropic import Anthropic
from openai import OpenAI
import chromadb

openai_client = OpenAI()
anthropic_client = Anthropic()
vector_db = chromadb.Client()
collection = vector_db.get_or_create_collection("docs")

@node(output_name="query_embedding")
async def embed_query(user_input: str, history: list) -> list[float]:
    """
    Embed the query with conversation context.
    Include recent history for better retrieval.
    """
    # Build context-aware query
    recent_context = ""
    if history:
        recent_exchanges = history[-4:]  # Last 2 exchanges
        recent_context = "\n".join(
            f"{msg['role']}: {msg['content']}" for msg in recent_exchanges
        )

    contextualized_query = f"{recent_context}\n\nCurrent question: {user_input}"

    response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=contextualized_query,
    )
    return response.data[0].embedding


@node(output_name="retrieved_docs")
async def retrieve(query_embedding: list[float], history: list) -> list[str]:
    """
    Retrieve relevant documents.
    Adjust retrieval based on conversation state.
    """
    # More documents for follow-up questions (they're often more specific)
    k = 3 if len(history) == 0 else 5

    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=k,
        include=["documents"],
    )
    return results["documents"][0]


@node(output_name="response")
async def generate(
    retrieved_docs: list[str],
    user_input: str,
    history: list,
    system_prompt: str = "You are a helpful assistant.",
) -> str:
    """Generate response using retrieved context and conversation history."""
    # Build the context block
    context = "\n\n---\n\n".join(retrieved_docs)

    # Build messages for the LLM
    messages = history.copy()
    messages.append({"role": "user", "content": user_input})

    message = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system=f"{system_prompt}\n\nContext:\n{context}",
        messages=messages,
    )

    return message.content[0].text


# The RAG pipeline as a composable unit
rag_pipeline = Graph(
    [embed_query, retrieve, generate],
    name="rag",
)
4
The embedding includes conversation history for context-aware retrieval. This helps handle follow-up questions like “Tell me more about that”.
5
Define the conversation loop
6
Wrap the RAG pipeline in a cyclic graph:
7
@node(output_name="history")
def accumulate_history(history: list, user_input: str, response: str) -> list:
    """Update conversation history with the new exchange."""
    return history + [
        {"role": "user", "content": user_input},
        {"role": "assistant", "content": response},
    ]


@route(targets=["rag", END])
def should_continue(history: list, response: str) -> str:
    """
    Decide if the conversation should continue.

    In a real system, this might:
    - Check for explicit end signals ("goodbye", "thanks, that's all")
    - Enforce a maximum turn limit
    - Detect conversation completion
    """
    # Check for end signals in the last response
    end_signals = ["goodbye", "have a great day", "is there anything else"]
    if any(signal in response.lower() for signal in end_signals):
        return END

    # Limit conversation length
    if len(history) >= 20:  # 10 exchanges
        return END

    return "rag"  # Continue the conversation


# Compose the full conversation system
conversation = Graph(
    [
        rag_pipeline.as_node(),  # DAG nested in cycle
        accumulate_history,
        should_continue,
    ],
    name="multi_turn_rag",
)
8
The @route decorator creates a routing node that directs flow based on the return value. Return END to exit the cycle.
9
Run the conversation
10
Create an interactive chat session:
11
async def chat_session():
    """Interactive chat session."""
    runner = AsyncRunner()

    # Initial state
    state = {
        "history": [],
        "system_prompt": "You are a helpful coding assistant with access to documentation.",
    }

    print("Chat started. Type 'quit' to exit.\n")

    while True:
        user_input = input("You: ").strip()
        if user_input.lower() == "quit":
            break

        # Run one turn
        result = await runner.run(
            conversation,
            {**state, "user_input": user_input},
        )

        print(f"Assistant: {result['response']}\n")

        # Update state for next turn
        state["history"] = result["history"]

        # Check if conversation ended naturally
        if len(result["history"]) >= 20:
            print("(Conversation ended - maximum length reached)")
            break


# Run the chat
import asyncio
asyncio.run(chat_session())

Key Design Decisions

Context-Aware Embedding

The query embedding includes recent conversation history:
contextualized_query = f"{recent_context}\n\nCurrent question: {user_input}"
This helps retrieval understand follow-up questions like “Tell me more about that” or “What about the second point?”

Adaptive Retrieval

Adjust retrieval strategy based on conversation state:
k = 3 if len(history) == 0 else 5
Initial questions get fewer documents (broad context). Follow-ups get more (specific details).

Clean History Management

The accumulate_history node handles state updates:
return history + [
    {"role": "user", "content": user_input},
    {"role": "assistant", "content": response},
]
No mutation. No side effects. Just pure data transformation.

Flexible Termination

The routing logic can terminate based on multiple conditions:
@route(targets=["rag", END])
def should_continue(history: list, response: str) -> str:
    # Content-based termination
    if any(signal in response.lower() for signal in end_signals):
        return END
    # Length-based termination
    if len(history) >= 20:
        return END
    return "rag"

Testing

Test the RAG pipeline independently:
import pytest

@pytest.mark.asyncio
async def test_rag_pipeline():
    runner = AsyncRunner()

    result = await runner.run(rag_pipeline, {
        "user_input": "How do I create a graph?",
        "history": [],
    })

    assert "response" in result
    assert len(result["retrieved_docs"]) > 0
Test the full conversation:
@pytest.mark.asyncio
async def test_multi_turn():
    runner = AsyncRunner()

    # First turn
    result = await runner.run(conversation, {
        "user_input": "What is hypergraph?",
        "history": [],
    })

    assert len(result["history"]) == 2  # 1 exchange

    # Second turn - follow-up question
    result = await runner.run(conversation, {
        "user_input": "How do I install it?",
        "history": result["history"],
    })

    assert len(result["history"]) == 4  # 2 exchanges

Extensions

Add Streaming

@node(output_name="response")
async def generate_streaming(
    retrieved_docs: list[str],
    user_input: str,
    history: list,
) -> str:
    context = "\n\n---\n\n".join(retrieved_docs)
    messages = history + [{"role": "user", "content": user_input}]

    response_chunks = []
    with anthropic_client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system=f"Answer based on context.\n\nContext:\n{context}",
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            response_chunks.append(text)

    return "".join(response_chunks)

Add Memory Summarization

For long conversations, summarize older history:
@node(output_name="compressed_history")
def compress_history(history: list) -> list:
    """Summarize older messages to save tokens."""
    if len(history) <= 10:
        return history

    # Keep recent messages, summarize older ones
    recent = history[-6:]
    older = history[:-6]

    # Create summary of older messages
    older_text = "\n".join(f"{m['role']}: {m['content']}" for m in older)

    message = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=300,
        messages=[{
            "role": "user",
            "content": f"Summarize this conversation in 2-3 sentences:\n\n{older_text}",
        }],
    )

    summary = message.content[0].text

    return [
        {"role": "system", "content": f"Previous conversation summary: {summary}"},
        *recent,
    ]

Add Citation Tracking

@node(output_name=("response", "citations"))
async def generate_with_citations(
    retrieved_docs: list[str],
    user_input: str,
    history: list,
) -> tuple[str, list[str]]:
    """Generate response and track which docs were cited."""

    # Number the documents
    context_parts = [f"[{i}] {doc}" for i, doc in enumerate(retrieved_docs, 1)]
    context = "\n\n".join(context_parts)

    messages = history + [{"role": "user", "content": user_input}]

    message = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system=f"Cite sources using [1], [2], etc.\n\nContext:\n{context}",
        messages=messages,
    )

    response = message.content[0].text

    # Extract which docs were cited
    citations = [
        doc for i, doc in enumerate(retrieved_docs, 1)
        if f"[{i}]" in response
    ]

    return response, citations
Nodes can return multiple outputs using tuples. Specify them with output_name=("name1", "name2").

What’s Next?

Evaluation Harness

Test this conversation system at scale

Prompt Optimization

Iteratively improve your prompts

RAG Pipeline

The single-turn version of this system

Hierarchical Composition

More nesting patterns

Build docs developers (and LLMs) love