The output_name="embedding" creates a named output that downstream nodes can reference.
8
Define the retrieval node
9
Search your vector database for relevant documents:
10
import chromadbvector_db = chromadb.Client()collection = vector_db.get_or_create_collection("docs")@node(output_name="docs")async def retrieve(embedding: list[float], top_k: int = 5) -> list[dict]: """ Search the vector database for relevant documents. Returns documents with content and metadata. """ results = collection.query( query_embeddings=[embedding], n_results=top_k, include=["documents", "metadatas", "distances"], ) return [ { "content": doc, "source": meta.get("source", "unknown"), "score": 1 - distance, # Convert distance to similarity } for doc, meta, distance in zip( results["documents"][0], results["metadatas"][0], results["distances"][0], ) ]
11
The embedding input is automatically wired to the embed node’s output.
12
Define the generation node
13
Generate an answer using Claude with retrieved context:
14
from anthropic import Anthropicanthropic_client = Anthropic()@node(output_name="answer")def generate(docs: list[dict], query: str) -> str: """ Generate an answer using Claude Sonnet 4.5. Cites sources from retrieved documents. """ # Format context with source attribution context_parts = [] for i, doc in enumerate(docs, 1): context_parts.append(f"[{i}] {doc['source']}:\n{doc['content']}") context = "\n\n".join(context_parts) message = anthropic_client.messages.create( model="claude-sonnet-4-5-20250929", max_tokens=2048, system="""You are a helpful assistant that answers questions based on the provided context.Always cite your sources using [1], [2], etc.If the context doesn't contain the answer, say so clearly.""", messages=[{ "role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}", }], ) return message.content[0].text
15
Both docs and query are automatically wired from upstream nodes.
16
Compose the graph
17
rag_pipeline = Graph([embed, retrieve, generate], name="rag")# Check what inputs are neededprint(rag_pipeline.inputs.required) # ('query',)print(rag_pipeline.inputs.optional) # ('top_k',)
18
Hypergraph automatically infers edges based on output and input names. No manual wiring needed!
19
Run the pipeline
20
async def main(): runner = AsyncRunner() result = await runner.run(rag_pipeline, { "query": "How do I create a graph in hypergraph?", "top_k": 5, }) print(f"Answer:\n{result['answer']}") print(f"\nRetrieved {len(result['docs'])} documents")# Run with asyncioimport asyncioasyncio.run(main())