Agentic workflows iterate until a goal is achieved. The graph cycles back to earlier nodes based on runtime conditions.
When to Use
- Multi-turn conversations: User asks, system responds, user follows up
- Iterative refinement: Generate, evaluate, improve until quality threshold
- Tool-using agents: Call tools, observe results, decide next action
- Retry patterns: Attempt, check result, retry if needed
The Core Pattern
Use @route to decide whether to continue or stop:
from hypergraph import Graph, node, route, END, SyncRunner
@node(output_name="draft")
def generate(prompt: str, feedback: str = "") -> str:
"""Generate content, incorporating any feedback."""
full_prompt = f"{prompt}\n\nFeedback to address: {feedback}" if feedback else prompt
return llm.generate(full_prompt)
@node(output_name="score")
def evaluate(draft: str) -> float:
"""Score the draft quality (0-1)."""
return quality_model.score(draft)
@node(output_name="feedback")
def critique(draft: str, score: float) -> str:
"""Generate feedback for improvement."""
if score >= 0.8:
return "" # Good enough
return critic_model.generate(f"Critique this draft:\n{draft}")
@route(targets=["generate", END])
def should_continue(score: float, attempts: int) -> str:
"""Decide whether to continue refining."""
if score >= 0.8:
return END # Quality achieved
if attempts >= 5:
return END # Max attempts reached
return "generate" # Keep refining
@node(output_name="attempts")
def count_attempts(attempts: int = 0) -> int:
"""Track iteration count."""
return attempts + 1
# Build the loop
refinement_loop = Graph([
generate,
evaluate,
critique,
count_attempts,
should_continue,
])
# Run until done
runner = SyncRunner()
result = runner.run(refinement_loop, {"prompt": "Write a haiku about Python"})
print(f"Final draft: {result['draft']}")
print(f"Final score: {result['score']}")
print(f"Attempts: {result['attempts']}")
How It Works
┌─────────────────────────────────────────┐
│ │
│ generate → evaluate → critique │
│ ↑ ↓ │
│ └──── should_continue ────→ END │
│ │
└─────────────────────────────────────────┘
generate creates a draft
Initial generation or refinement based on feedback
evaluate scores it
Quality assessment (0-1 score)
critique provides feedback
Generates improvement suggestions if needed
should_continue decides
Return END → graph completes
Return "generate" → loop back
The END Sentinel
END is a special value that terminates execution:
from hypergraph import END
@route(targets=["next_step", END])
def check_done(result: dict) -> str:
if result["complete"]:
return END
return "next_step"
Always include END in your targets when you want the option to stop.
Multi-Turn Conversation
A conversation loop that continues until the user says goodbye:
@node(output_name="response")
def generate_response(messages: list, context: str) -> str:
"""Generate assistant response."""
return llm.chat(messages, system=context)
@node(output_name="messages")
def update_history(messages: list, user_input: str, response: str) -> list:
"""Append new messages to history."""
return messages + [
{"role": "user", "content": user_input},
{"role": "assistant", "content": response},
]
@route(targets=["generate_response", END])
def should_continue_chat(response: str, messages: list) -> str:
"""Check if conversation should continue."""
# End if assistant said goodbye or max turns reached
if "goodbye" in response.lower() or len(messages) > 20:
return END
return "generate_response"
chat_loop = Graph([
generate_response,
update_history,
should_continue_chat,
])
An agent that decides which tool to call:
@node(output_name="action")
def decide_action(observation: str, goal: str) -> dict:
"""Decide next action based on observation."""
return agent_model.decide(observation, goal)
@node(output_name="observation")
def execute_action(action: dict) -> str:
"""Execute the chosen action."""
tool_name = action["tool"]
tool_args = action["args"]
return tools[tool_name](**tool_args)
@route(targets=["decide_action", END])
def check_goal_achieved(action: dict, observation: str) -> str:
"""Check if the goal is achieved."""
if action["tool"] == "finish":
return END
return "decide_action"
agent_loop = Graph([decide_action, execute_action, check_goal_achieved])
Ordering with emit/wait_for
In cyclic graphs, you sometimes need a node to wait for another node to finish — even when there’s no direct data dependency. Use emit and wait_for to enforce execution order.
The problem: In a chat loop, should_continue reads messages. But accumulate also reads messages and produces the updated version. Without ordering, should_continue might see stale messages from the previous turn.
The fix: accumulate emits a signal when it finishes. should_continue waits for that signal.
@node(output_name="response")
def generate(messages: list) -> str:
return llm.chat(messages)
@node(output_name="messages", emit="turn_done")
def accumulate(messages: list, response: str) -> list:
return messages + [{"role": "assistant", "content": response}]
@route(targets=["generate", END], wait_for="turn_done")
def should_continue(messages: list) -> str:
if len(messages) >= 10:
return END
return "generate"
How it works:
emit="turn_done" declares an ordering-only output. A sentinel value is auto-produced when accumulate runs — your function doesn’t return it.
wait_for="turn_done" declares an ordering-only input. should_continue won’t run until turn_done exists and is fresh.
emit names appear in node.outputs but not in node.data_outputs. They’re filtered from the final result.
When to use emit/wait_for vs data edges
- If node B needs node A’s output value → use a data edge (parameter matching)
- If node B just needs to run after node A → use
emit/wait_for
Entry Points
When a parameter is both an input and output of a cycle (like history or iteration), it becomes an entrypoint parameter — an initial value needed to start the first iteration. Provide these in the values dict when calling runner.run():
result = runner.run(graph, {
"prompt": "...",
"history": [], # Entry point: initial value before first iteration
"iteration": 0, # Entry point: starting counter
})
You can check what entrypoints a graph has via graph.inputs.entrypoints:
print(graph.inputs.entrypoints)
# {'accumulate_history': ('history',), 'increment': ('iteration',)}
Tracking State Across Iterations
Use a node to accumulate state:
@node(output_name="history")
def accumulate_history(history: list, new_item: str) -> list:
"""Append new item to history."""
return history + [new_item]
@node(output_name="iteration")
def increment(iteration: int = 0) -> int:
"""Track iteration count."""
return iteration + 1
Provide initial values when running:
result = runner.run(graph, {
"history": [], # Start with empty history
"iteration": 0, # Start at iteration 0
"prompt": "...",
})
Shared Outputs in a Cycle
When multiple nodes produce the same output name in a cycle — like two nodes both producing messages — the graph needs to know their execution order. There are two approaches: ordering signals (emit/wait_for) and explicit edges.
With Explicit Edges
When cycles share common output names like messages, df, or state, explicit edges let you declare the topology directly instead of inventing signal names. Pass edges to Graph() to disable auto-inference and wire edges manually:
from hypergraph import Graph, node, route, END
@node(output_name="messages")
def add_query(messages: list, query: str) -> list:
return [*messages, {"role": "user", "content": query}]
@node(output_name="response")
def generate(messages: list) -> str:
return llm.chat(messages)
@node(output_name="messages")
def add_response(messages: list, response: str) -> list:
return [*messages, {"role": "assistant", "content": response}]
@route(targets=["add_query", END])
def should_continue(messages: list) -> str:
return END if len(messages) >= 10 else "add_query"
chat = Graph(
[add_query, generate, add_response, should_continue],
edges=[
(add_query, generate), # messages
(generate, add_response), # response
(add_response, should_continue), # messages
(add_response, add_query), # messages (cycle)
],
)
Each edge is a (source, target) tuple. Values are auto-detected from the intersection of source outputs and target inputs.
When to use which approach:| Situation | Use |
|---|
| DAGs (no cycles) | Auto-inference (no edges needed) |
| Cycles with unique output names | Auto-inference + emit/wait_for for ordering |
Cycles with shared output names (messages, state) | Explicit edges |
Preventing Infinite Loops
Hypergraph detects potential infinite loops at runtime:
# This will raise InfiniteLoopError if the loop runs too long
runner = SyncRunner()
result = runner.run(graph, inputs, max_iterations=100) # Safety limit
Best practices:
- Always have a termination condition (max attempts, quality threshold)
- Include
END in your route targets
- Track iteration count and bail out if needed
What’s Next?