Skip to main content
Test your multi-turn conversation system at scale. The cyclic conversation graph becomes a node inside an evaluation DAG.

Why This Example?

This showcases the flip side of the natural hierarchy: a cycle (conversation) nested inside a DAG (evaluation). Same graph. Different context. Build once, reuse everywhere.

The Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    EVALUATION PIPELINE (DAG)                    │
│                                                                 │
│  load_test_cases → conversation → score → aggregate → report   │
│                         │                                       │
│                         ▼                                       │
│              ┌─────────────────────┐                           │
│              │  CONVERSATION LOOP  │                           │
│              │     (cyclic)        │                           │
│              │                     │                           │
│              │  rag → accumulate   │                           │
│              │   ↑         ↓       │                           │
│              │   └── continue? ────┘                           │
│              └─────────────────────┘                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Implementation

1
Import the conversation graph
2
Reuse the conversation system from the Multi-Turn RAG example:
3
from hypergraph import Graph, node, route, END, AsyncRunner
import json
from statistics import mean

# Assuming you have the conversation graph from multi-turn-rag:
# from my_app.graphs import conversation
#
# Or define it inline (abbreviated for this example):

@node(output_name="response")
async def generate(retrieved_docs: list, user_input: str, history: list) -> str:
    # ... RAG generation logic from multi-turn-rag ...
    pass

@node(output_name="history")
def accumulate(history: list, user_input: str, response: str) -> list:
    return history + [
        {"role": "user", "content": user_input},
        {"role": "assistant", "content": response},
    ]

@route(targets=["rag", END])
def should_continue(history: list) -> str:
    if len(history) >= 10:  # Max 5 turns for eval
        return END
    return "rag"

rag_pipeline = Graph([embed_query, retrieve, generate], name="rag")
conversation = Graph(
    [rag_pipeline.as_node(), accumulate, should_continue],
    name="conversation",
)
4
The conversation graph is the same one used in production. We’re just running it in a different context.
5
Load test cases
6
Define test cases with expected outcomes:
7
@node(output_name="test_cases")
def load_test_cases(dataset_path: str) -> list[dict]:
    """
    Load test conversations.

    Each test case has:
    - initial_query: The first user message
    - follow_ups: List of follow-up questions
    - expected_topics: Topics that should be covered
    - expected_facts: Facts that should be mentioned
    """
    with open(dataset_path) as f:
        return json.load(f)
8
Example test data format:
9
[
  {
    "id": "test_001",
    "initial_query": "What is hypergraph?",
    "follow_ups": [
      "How do I install it?",
      "Show me a simple example"
    ],
    "expected_topics": ["workflow", "graph", "nodes"],
    "expected_facts": ["pip install", "@node decorator"]
  },
  {
    "id": "test_002",
    "initial_query": "How do I create a multi-turn conversation?",
    "follow_ups": [
      "What about handling history?",
      "How do I know when to stop?"
    ],
    "expected_topics": ["loop", "history", "END"],
    "expected_facts": ["@route", "accumulate"]
  }
]
10
Run conversations
11
Execute the conversation graph for each test case:
12
@node(output_name="conversation_result")
async def run_conversation(
    test_case: dict,
    system_prompt: str = "You are a helpful assistant.",
) -> dict:
    """
    Run the conversation graph with a test case.

    This demonstrates running a cyclic graph as part of a larger workflow.
    """
    runner = AsyncRunner()

    # Start with the initial query
    state = {
        "user_input": test_case["initial_query"],
        "history": [],
        "system_prompt": system_prompt,
    }

    # Run the first turn
    result = await runner.run(conversation, state)
    history = result["history"]

    # Run follow-up turns
    for follow_up in test_case.get("follow_ups", []):
        state = {
            "user_input": follow_up,
            "history": history,
            "system_prompt": system_prompt,
        }
        result = await runner.run(conversation, state)
        history = result["history"]

    return {
        "test_case_id": test_case.get("id", "unknown"),
        "history": history,
        "final_response": history[-1]["content"] if history else "",
        "turn_count": len(history) // 2,
    }
13
This node runs an entire conversation (multiple turns) as a single operation. The conversation’s cyclic nature is hidden from the evaluation DAG.
14
Score conversations
15
Evaluate each conversation against expected outcomes:
16
@node(output_name="scores")
def score_conversation(conversation_result: dict, test_case: dict) -> dict:
    """
    Score a single conversation against expected outcomes.
    """
    history = conversation_result["history"]
    all_responses = " ".join(
        msg["content"] for msg in history if msg["role"] == "assistant"
    )

    # Topic coverage
    expected_topics = test_case.get("expected_topics", [])
    topics_covered = sum(
        1 for topic in expected_topics if topic.lower() in all_responses.lower()
    )
    topic_coverage = topics_covered / len(expected_topics) if expected_topics else 1.0

    # Fact accuracy
    expected_facts = test_case.get("expected_facts", [])
    facts_mentioned = sum(
        1 for fact in expected_facts if fact.lower() in all_responses.lower()
    )
    fact_accuracy = facts_mentioned / len(expected_facts) if expected_facts else 1.0

    # Response quality
    avg_response_length = mean(
        len(msg["content"]) for msg in history if msg["role"] == "assistant"
    ) if history else 0

    return {
        "test_case_id": conversation_result["test_case_id"],
        "topic_coverage": topic_coverage,
        "fact_accuracy": fact_accuracy,
        "turn_count": conversation_result["turn_count"],
        "avg_response_length": avg_response_length,
        "passed": topic_coverage >= 0.8 and fact_accuracy >= 0.8,
    }
17
Aggregate and report
18
Collect results and generate a report:
19
@node(output_name="report")
def aggregate_results(all_scores: list[dict]) -> dict:
    """
    Aggregate scores across all test cases.
    """
    if not all_scores:
        return {"error": "No test cases evaluated"}

    passed_count = sum(1 for s in all_scores if s["passed"])

    return {
        "total_tests": len(all_scores),
        "passed": passed_count,
        "failed": len(all_scores) - passed_count,
        "pass_rate": passed_count / len(all_scores),
        "avg_topic_coverage": mean(s["topic_coverage"] for s in all_scores),
        "avg_fact_accuracy": mean(s["fact_accuracy"] for s in all_scores),
        "avg_turns": mean(s["turn_count"] for s in all_scores),
        "detailed_results": all_scores,
    }


@node(output_name="formatted_report")
def format_report(report: dict) -> str:
    """Generate human-readable report."""
    lines = [
        "=" * 50,
        "EVALUATION REPORT",
        "=" * 50,
        f"Total tests: {report['total_tests']}",
        f"Passed: {report['passed']} ({report['pass_rate']:.1%})",
        f"Failed: {report['failed']}",
        "",
        "Metrics:",
        f"  Topic coverage: {report['avg_topic_coverage']:.1%}",
        f"  Fact accuracy:  {report['avg_fact_accuracy']:.1%}",
        f"  Avg turns:      {report['avg_turns']:.1f}",
        "=" * 50,
    ]

    # Add failed test details
    failed = [r for r in report["detailed_results"] if not r["passed"]]
    if failed:
        lines.append("\nFailed tests:")
        for f in failed:
            lines.append(f"  - {f['test_case_id']}: "
                        f"topics={f['topic_coverage']:.0%}, "
                        f"facts={f['fact_accuracy']:.0%}")

    return "\n".join(lines)
20
Compose the evaluation pipeline
21
# For a single test case
single_eval = Graph([
    run_conversation,
    score_conversation,
], name="single_eval")

# For the full evaluation suite
evaluation_pipeline = Graph([
    load_test_cases,
    aggregate_results,
    format_report,
], name="evaluation")
22
Run the evaluation
23
async def run_evaluation(dataset_path: str) -> str:
    """
    Run the full evaluation pipeline.
    """
    runner = AsyncRunner()

    # Load test cases
    with open(dataset_path) as f:
        test_cases = json.load(f)

    # Run conversations in parallel using map
    conversation_results = await runner.map(
        single_eval,
        {"test_case": test_cases},
        map_over="test_case",
        max_concurrency=5,  # Limit parallel conversations
    )

    # Extract scores
    all_scores = [r["scores"] for r in conversation_results]

    # Generate report
    report = aggregate_results.func(all_scores)
    formatted = format_report.func(report)

    return formatted


# Example usage:
import asyncio
report = asyncio.run(run_evaluation("test_conversations.json"))
print(report)

Key Patterns

Cycle Inside DAG

The conversation graph (cyclic) runs inside the evaluation_pipeline (DAG):
conversation_results = await runner.map(
    single_eval,  # Contains the cyclic conversation
    {"test_case": test_cases},
    map_over="test_case",
)

Same Graph, Different Context

The conversation graph is the same one used in production:
# In production:
result = await runner.run(conversation, {"user_input": user_query, "history": []})

# In evaluation:
result = await runner.run(conversation, {"user_input": test_case["initial_query"], "history": []})

Parallel Test Execution

Run multiple test conversations concurrently:
conversation_results = await runner.map(
    single_eval,
    {"test_case": test_cases},
    map_over="test_case",
    max_concurrency=5,
)

Pure Scoring Functions

Scoring is a pure function — easy to test:
def test_scoring():
    result = score_conversation.func(
        conversation_result={"history": [...], "turn_count": 3},
        test_case={"expected_topics": ["graph"], "expected_facts": ["@node"]}
    )
    assert "topic_coverage" in result
    assert 0 <= result["topic_coverage"] <= 1

Extensions

A/B Testing

Compare two conversation implementations:
@node(output_name="comparison")
async def compare_implementations(
    test_case: dict,
    impl_a: Graph,
    impl_b: Graph,
) -> dict:
    """Compare two conversation implementations."""
    runner = AsyncRunner()

    # Run both implementations
    result_a = await runner.run(impl_a, {
        "user_input": test_case["initial_query"],
        "history": [],
    })
    result_b = await runner.run(impl_b, {
        "user_input": test_case["initial_query"],
        "history": [],
    })

    # Score both
    score_a = score_conversation.func(
        {"history": result_a["history"], "test_case_id": test_case["id"], "turn_count": 1},
        test_case,
    )
    score_b = score_conversation.func(
        {"history": result_b["history"], "test_case_id": test_case["id"], "turn_count": 1},
        test_case,
    )

    return {
        "test_case_id": test_case["id"],
        "impl_a_score": score_a["topic_coverage"],
        "impl_b_score": score_b["topic_coverage"],
        "winner": "a" if score_a["topic_coverage"] > score_b["topic_coverage"] else "b",
    }

LLM-as-Judge Scoring

Use an LLM to evaluate response quality:
from anthropic import Anthropic

anthropic_client = Anthropic()

@node(output_name="llm_score")
async def llm_evaluate(conversation_result: dict, test_case: dict) -> float:
    """Use Claude to evaluate conversation quality."""
    history = conversation_result["history"]
    conversation_text = "\n".join(
        f"{msg['role']}: {msg['content']}" for msg in history
    )

    message = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=100,
        messages=[{
            "role": "user",
            "content": f"""Rate this conversation from 0 to 1 based on:
- Helpfulness
- Accuracy
- Clarity

Conversation:
{conversation_text}

Expected topics: {test_case.get('expected_topics', [])}

Return only a number between 0 and 1.""",
        }],
    )

    return float(message.content[0].text.strip())

CI Integration

Run evaluations in continuous integration:
pytest_test.py
import pytest

@pytest.mark.asyncio
async def test_conversation_quality():
    """Run in CI to catch regressions."""
    report = await run_evaluation("tests/fixtures/eval_dataset.json")

    # Parse report
    lines = report.split("\n")
    pass_rate_line = [l for l in lines if "Passed:" in l][0]
    pass_rate = float(pass_rate_line.split("(")[1].split("%")[0]) / 100

    assert pass_rate >= 0.9, f"Pass rate {pass_rate:.1%} below threshold"

What’s Next?

Multi-Turn RAG

The conversation system being evaluated

Prompt Optimization

Iteratively improve prompts using evaluation

Batch Processing

More on runner.map() for parallel execution

Hierarchical Composition

Understanding nested graph patterns

Build docs developers (and LLMs) love