Skip to main content
Pause execution for human input. Resume when ready.
  • @interrupt - Decorator to create a pause point (like @node but pauses on None)
  • PauseInfo - Metadata about the pause (what value is surfaced, where to resume)
  • Handlers - Auto-resolve interrupts programmatically (testing, automation)

The Problem

Many workflows need human judgment at key points: approving a draft, confirming a destructive action, providing feedback on generated content. You need a way to pause the graph, surface a value to the user, and resume with their response.

Basic Pause and Resume

The @interrupt decorator creates a pause point. Inputs come from the function signature, outputs from output_name. When the handler returns None, execution pauses.
from hypergraph import Graph, node, AsyncRunner, interrupt

@node(output_name="draft")
def generate_draft(prompt: str) -> str:
    return f"Blog post about: {prompt}"

@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    return None  # pause for human review

@node(output_name="result")
def finalize(decision: str) -> str:
    return f"Published: {decision}"

graph = Graph([generate_draft, approval, finalize])
runner = AsyncRunner()

# First run: pauses at the interrupt
result = await runner.run(graph, {"prompt": "Python async"})

assert result.paused
assert result.pause.value == "Blog post about: Python async"
assert result.pause.response_key == "decision"

# Resume with the user's response
result = await runner.run(graph, {
    "prompt": "Python async",
    result.pause.response_key: "Looks great, publish it!",
})

assert result["result"] == "Published: Looks great, publish it!"
1

Run the graph

Execution pauses at the interrupt
2

Inspect pause.value

See what the user needs to review
3

Resume with response

Pass the response via pause.response_key

RunResult Properties

When paused, the RunResult has:
PropertyDescription
result.pausedTrue when execution is paused
result.pause.valueThe input value surfaced to the caller
result.pause.node_nameName of the interrupt node that paused
result.pause.output_paramOutput parameter name
result.pause.response_keyKey to use in the values dict when resuming

Multi-Turn Chat with Human Input

Combine @interrupt with agentic loops for a multi-turn conversation where the user provides input each turn.
from hypergraph import Graph, node, route, END, AsyncRunner, interrupt

# Pause and wait for user input
@interrupt(output_name="user_input")
def ask_user(messages: list) -> str | None:
    return None  # always pause for human input

@node(output_name="messages")
def add_user_message(messages: list, user_input: str) -> list:
    return messages + [{"role": "user", "content": user_input}]

@node(output_name="response")
def generate(messages: list) -> str:
    return llm.chat(messages)  # your LLM client

@node(output_name="messages", emit="turn_done")
def accumulate(messages: list, response: str) -> list:
    return messages + [{"role": "assistant", "content": response}]

@route(targets=["ask_user", END], wait_for="turn_done")
def should_continue(messages: list) -> str:
    if len(messages) >= 20:
        return END
    return "ask_user"

graph = Graph([ask_user, add_user_message, generate, accumulate, should_continue])

# Pre-fill messages so the first step (ask_user) can run immediately
chat = graph.bind(messages=[])

runner = AsyncRunner()

# Turn 1: graph pauses at ask_user (shows empty messages)
result = await runner.run(chat, {})
assert result.paused
assert result.pause.node_name == "ask_user"

# Resume with user's first message
result = await runner.run(chat, {"user_input": "Hello!"})
# Pauses again at ask_user for the next turn
assert result.paused

# Resume with second message
result = await runner.run(chat, {"user_input": "Tell me more"})
Key patterns:
  • .bind(messages=[]) pre-fills the seed input so .run({}) works with no values
  • Interrupt as first step: the graph pauses immediately, asking the user for input
  • emit="turn_done" + wait_for="turn_done": ensures should_continue sees the fully updated messages
  • Each resume replays the graph, providing all previous responses

Alternative: Explicit Edges

The same chat loop without emit/wait_for signals. When multiple nodes produce messages, explicit edges declare the topology directly:
from hypergraph import Graph, node, route, END, AsyncRunner, interrupt

@interrupt(output_name="query")
def ask_user(response: str) -> str | None:
    return None  # always pause for human input

@node(output_name="messages")
def add_query(messages: list, query: str) -> list:
    return [*messages, {"role": "user", "content": query}]

@node(output_name="response")
def generate(messages: list) -> str:
    return llm.chat(messages)

@node(output_name="messages")
def add_response(messages: list, response: str) -> list:
    return [*messages, {"role": "assistant", "content": response}]

graph = Graph(
    [ask_user, add_query, generate, add_response],
    edges=[
        (ask_user, add_query),                   # query
        (add_query, generate),                    # messages
        (generate, add_response),                 # response
        (add_response, ask_user),                 # ordering only
        (add_response, add_query),                # messages (cycle)
    ],
)

chat = graph.bind(messages=[])
runner = AsyncRunner()

# Turn 1: pauses at ask_user
result = await runner.run(chat, {})
assert result.paused

# Resume with user's first message
result = await runner.run(chat, {"query": "Hello!"})
assert result.paused  # pauses again for next turn

Auto-Resolve with Handlers

For testing or automation, the handler function resolves the interrupt without human input. Return a value to auto-resolve, return None to pause.
@interrupt(output_name="decision")
def approval(draft: str) -> str:
    return "auto-approved"  # always resolves, never pauses

graph = Graph([generate_draft, approval, finalize])
result = await runner.run(graph, {"prompt": "Python async"})

# No pause — handler resolved it
assert result["result"] == "Published: auto-approved"

Conditional Pause

Return None to pause, return a value to auto-resolve:
@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    if "LGTM" in draft:
        return "auto-approved"
    return None  # pause for human review

Async Handlers

Handlers can be async:
@interrupt(output_name="decision")
async def approval(draft: str) -> str:
    """Use an LLM to auto-review the draft."""
    return await call_llm(f"Review this draft: {draft}")

Multiple Sequential Interrupts

A graph can have multiple interrupts. Execution pauses at each one in topological order.
@node(output_name="draft")
def generate(prompt: str) -> str:
    return f"Draft: {prompt}"

@interrupt(output_name="feedback")
def review(draft: str) -> str | None:
    return None  # pause for reviewer

@interrupt(output_name="final_draft")
def edit(feedback: str) -> str | None:
    return None  # pause for editor

@node(output_name="result")
def publish(final_draft: str) -> str:
    return f"Published: {final_draft}"

graph = Graph([generate, review, edit, publish])
runner = AsyncRunner()

# Pause 1: review
r1 = await runner.run(graph, {"prompt": "hello"})
assert r1.pause.node_name == "review"

# Pause 2: edit (provide review response)
r2 = await runner.run(graph, {
    "prompt": "hello",
    "feedback": "Needs more detail",
})
assert r2.pause.node_name == "edit"
assert r2.pause.value == "Needs more detail"

# Complete (provide both responses)
r3 = await runner.run(graph, {
    "prompt": "hello",
    "feedback": "Needs more detail",
    "final_draft": "Detailed draft about hello",
})
assert r3["result"] == "Published: Detailed draft about hello"
Each resume call replays the graph from the start, providing previously-collected responses as input values. The interrupt detects that its output is already in the state and skips the pause.

Nested Graph Interrupts

Interrupts inside nested graphs propagate the pause to the outer graph. The node_name is prefixed with the nested graph’s name.
# Inner graph with an interrupt
@interrupt(output_name="y")
def approval(x: str) -> str | None:
    return None

inner = Graph([approval], name="inner")

@node(output_name="x")
def produce(query: str) -> str:
    return query

@node(output_name="result")
def consume(y: str) -> str:
    return f"got: {y}"

outer = Graph([produce, inner.as_node(), consume])
runner = AsyncRunner()

result = await runner.run(outer, {"query": "hello"})

assert result.paused
assert result.pause.node_name == "inner/approval"
assert result.pause.response_key == "inner.y"
The response_key uses dot-separated paths for nested interrupts: "inner.y" means the output y inside the graph node inner.

Runner Compatibility

Only AsyncRunner supports interrupts. SyncRunner raises IncompatibleRunnerError at runtime if the graph contains interrupt nodes.
from hypergraph import SyncRunner
from hypergraph.exceptions import IncompatibleRunnerError

runner = SyncRunner()

# Raises IncompatibleRunnerError
runner.run(graph_with_interrupt, {"query": "hello"})
Similarly, AsyncRunner.map() does not support interrupts — a graph with interrupts cannot be used with map().

API Reference

@interrupt Decorator

from hypergraph import interrupt

@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    return None  # pause for human review
ParameterTypeDescription
output_namestr or tuple[str, ...]Name(s) for output value(s) — required
rename_inputsdict[str, str] or NoneMapping to rename inputs (old to new)
cacheboolEnable result caching (default: False)
emitstr or tuple[str, ...] or NoneOrdering-only output name(s)
wait_forstr or tuple[str, ...] or NoneOrdering-only input name(s)
hideboolWhether to hide from visualization

PauseInfo

@dataclass
class PauseInfo:
    node_name: str                          # Name of the interrupt node (uses "/" for nesting)
    output_param: str                       # First output parameter name
    value: Any                              # First input value surfaced to the caller
    output_params: tuple[str, ...] | None   # All output names (multi-output), else None
    values: dict[str, Any] | None           # All input values (multi-input), else None
PropertyTypeDescription
response_keystrKey to use when resuming (first output). Top-level: output_param. Nested: dot-separated path (e.g., "inner.decision")
response_keysdict[str, str]Map of all output names to resume keys (for multi-output interrupts)

What’s Next?

Build docs developers (and LLMs) love