Skip to main content
A single-pass RAG pipeline that embeds queries, retrieves relevant documents, and generates answers with source citations.

When to Use

  • Question-answering over documents
  • Knowledge base search
  • Single-turn information retrieval
For multi-turn conversations with follow-up questions, see Multi-Turn RAG.

The Pipeline

query → embed → retrieve → generate → answer
This is a pure DAG: no cycles, no branching. Data flows in one direction from input to output.

Implementation

1
Set up dependencies
2
Install the required packages:
3
pip install hypergraph anthropic openai chromadb
4
Define the embedding node
5
Embed the user’s query using OpenAI’s embedding model:
6
from hypergraph import Graph, node, AsyncRunner
from openai import OpenAI

openai_client = OpenAI()

@node(output_name="embedding")
async def embed(query: str) -> list[float]:
    """
    Embed the query for vector search.
    Uses OpenAI's text-embedding-3-large model.
    """
    response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=query,
    )
    return response.data[0].embedding
7
The output_name="embedding" creates a named output that downstream nodes can reference.
8
Define the retrieval node
9
Search your vector database for relevant documents:
10
import chromadb

vector_db = chromadb.Client()
collection = vector_db.get_or_create_collection("docs")

@node(output_name="docs")
async def retrieve(embedding: list[float], top_k: int = 5) -> list[dict]:
    """
    Search the vector database for relevant documents.
    Returns documents with content and metadata.
    """
    results = collection.query(
        query_embeddings=[embedding],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],
    )

    return [
        {
            "content": doc,
            "source": meta.get("source", "unknown"),
            "score": 1 - distance,  # Convert distance to similarity
        }
        for doc, meta, distance in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0],
        )
    ]
11
The embedding input is automatically wired to the embed node’s output.
12
Define the generation node
13
Generate an answer using Claude with retrieved context:
14
from anthropic import Anthropic

anthropic_client = Anthropic()

@node(output_name="answer")
def generate(docs: list[dict], query: str) -> str:
    """
    Generate an answer using Claude Sonnet 4.5.
    Cites sources from retrieved documents.
    """
    # Format context with source attribution
    context_parts = []
    for i, doc in enumerate(docs, 1):
        context_parts.append(f"[{i}] {doc['source']}:\n{doc['content']}")

    context = "\n\n".join(context_parts)

    message = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system="""You are a helpful assistant that answers questions based on the provided context.
Always cite your sources using [1], [2], etc.
If the context doesn't contain the answer, say so clearly.""",
        messages=[{
            "role": "user",
            "content": f"Context:\n{context}\n\nQuestion: {query}",
        }],
    )

    return message.content[0].text
15
Both docs and query are automatically wired from upstream nodes.
16
Compose the graph
17
rag_pipeline = Graph([embed, retrieve, generate], name="rag")

# Check what inputs are needed
print(rag_pipeline.inputs.required)  # ('query',)
print(rag_pipeline.inputs.optional)  # ('top_k',)
18
Hypergraph automatically infers edges based on output and input names. No manual wiring needed!
19
Run the pipeline
20
async def main():
    runner = AsyncRunner()

    result = await runner.run(rag_pipeline, {
        "query": "How do I create a graph in hypergraph?",
        "top_k": 5,
    })

    print(f"Answer:\n{result['answer']}")
    print(f"\nRetrieved {len(result['docs'])} documents")

# Run with asyncio
import asyncio
asyncio.run(main())

Add Streaming

Stream the generation while retrieval happens first:
@node(output_name="answer")
def generate_streaming(docs: list[dict], query: str) -> str:
    """Generate with streaming output."""

    context_parts = [f"[{i}] {doc['source']}:\n{doc['content']}"
                     for i, doc in enumerate(docs, 1)]
    context = "\n\n".join(context_parts)

    chunks = []
    print("Answer: ", end="")

    with anthropic_client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system="Answer based on context. Cite sources.",
        messages=[{
            "role": "user",
            "content": f"Context:\n{context}\n\nQuestion: {query}",
        }],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            chunks.append(text)

    print("\n")
    return "".join(chunks)

Add Reranking

Add a reranking step for better relevance:
import cohere

cohere_client = cohere.Client()

@node(output_name="docs")
async def retrieve(embedding: list[float], top_k: int = 20) -> list[dict]:
    """Retrieve more candidates for reranking."""
    results = collection.query(
        query_embeddings=[embedding],
        n_results=top_k,
    )
    return [{
        "content": doc,
        "source": meta.get("source", "unknown")
    } for doc, meta in zip(results["documents"][0], results["metadatas"][0])]

@node(output_name="reranked_docs")
def rerank(docs: list[dict], query: str, top_k: int = 5) -> list[dict]:
    """Rerank documents using Cohere's rerank API."""
    results = cohere_client.rerank(
        query=query,
        documents=[d["content"] for d in docs],
        top_n=top_k,
        model="rerank-english-v3.0",
    )

    return [docs[result.index] for result in results.results]

@node(output_name="answer")
def generate(reranked_docs: list[dict], query: str) -> str:
    # Uses reranked_docs instead of docs
    ...

rag_with_rerank = Graph([embed, retrieve, rerank, generate])

Add Query Expansion

Expand the query for better retrieval:
@node(output_name="expanded_query")
def expand_query(query: str) -> str:
    """Expand query with related terms."""

    response = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=200,
        messages=[{
            "role": "user",
            "content": f"""Expand this search query by adding 2-3 related terms or phrasings.
Return only the expanded query, no explanation.

Original query: {query}""",
        }],
    )

    return response.content[0].text

@node(output_name="embedding")
async def embed(expanded_query: str) -> list[float]:
    """Embed the expanded query."""
    response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=expanded_query,
    )
    return response.data[0].embedding

rag_with_expansion = Graph([expand_query, embed, retrieve, generate])

Testing

Test individual nodes and the full pipeline:
import pytest
from hypergraph import AsyncRunner

@pytest.fixture
def runner():
    return AsyncRunner()

@pytest.mark.asyncio
async def test_rag_pipeline(runner):
    result = await runner.run(rag_pipeline, {
        "query": "What is hypergraph?",
    })

    assert "answer" in result
    assert len(result["answer"]) > 50
    assert len(result["docs"]) > 0

def test_embed():
    """Test embedding in isolation."""
    import asyncio
    embedding = asyncio.run(embed.func("test query"))

    assert isinstance(embedding, list)
    assert len(embedding) > 0
    assert all(isinstance(x, float) for x in embedding)

def test_generate():
    """Test generation with mock docs."""
    docs = [
        {"content": "Hypergraph is a workflow framework.", "source": "docs"},
        {"content": "It supports DAGs and cycles.", "source": "readme"},
    ]

    answer = generate.func(docs, "What is hypergraph?")

    assert "workflow" in answer.lower() or "framework" in answer.lower()
Every node decorated with @node has a .func attribute for testing without the framework.

What’s Next?

Multi-Turn RAG

Add conversation loops with follow-up questions

Data Pipeline

Build ETL workflows for batch processing

Evaluation Harness

Test your RAG system at scale

Hierarchical Composition

Nest this pipeline in larger workflows

Build docs developers (and LLMs) love