Overview
Human-in-the-loop (HITL) enables LangGraph agents to pause for human approval, input, or correction. This is critical for high-stakes decisions, quality control, and building trustworthy AI systems.
Interrupt Points
The simplest HITL pattern: pause execution at specific nodes.
Interrupt Before
Pause before a node executes:
from langgraph.graph import StateGraph, START , END
from langgraph.checkpoint.memory import InMemorySaver
from typing_extensions import TypedDict
class State ( TypedDict ):
plan: str
approved: bool
def create_plan ( state : State) -> dict :
return { "plan" : "Execute action X" }
def execute_plan ( state : State) -> dict :
# This only runs after human approval
return { "approved" : True }
builder = StateGraph(State)
builder.add_node( "planner" , create_plan)
builder.add_node( "executor" , execute_plan)
builder.add_edge( START , "planner" )
builder.add_edge( "planner" , "executor" )
builder.add_edge( "executor" , END )
# Pause before executor runs
checkpointer = InMemorySaver()
graph = builder.compile(
checkpointer = checkpointer,
interrupt_before = [ "executor" ]
)
Interrupt After
Pause after a node completes:
# Review output before continuing
graph = builder.compile(
checkpointer = checkpointer,
interrupt_after = [ "planner" ] # Review plan before execution
)
Interrupt All
Pause at every node:
graph = builder.compile(
checkpointer = checkpointer,
interrupt_before = "*" , # Pause before every node
interrupt_after = "*" # Or after every node
)
Interrupts require a checkpointer. Without one, the graph cannot save state to resume later.
Using Interrupts
Basic Workflow
config = { "configurable" : { "thread_id" : "workflow-1" }}
# First invocation - runs until interrupt
result = graph.invoke({ "plan" : "" , "approved" : False }, config)
# Check state at interrupt
state = graph.get_state(config)
print (state.values) # {'plan': 'Execute action X', 'approved': False}
print (state.next) # ('executor',) - waiting to run executor
# Human reviews the plan...
if human_approves(state.values[ "plan" ]):
# Resume execution
result = graph.invoke( None , config)
print (result) # {'plan': '...', 'approved': True}
Streaming with Interrupts
config = { "configurable" : { "thread_id" : "workflow-2" }}
# Stream until interrupt
for event in graph.stream({ "plan" : "" }, config, stream_mode = "updates" ):
print (event)
# {'planner': {'plan': 'Execute action X'}}
# Pauses here at interrupt_before=["executor"]
# Resume streaming
for event in graph.stream( None , config, stream_mode = "updates" ):
print (event)
# {'executor': {'approved': True}}
Dynamic Interrupts
Trigger interrupts programmatically from within nodes:
from langgraph.types import interrupt
def review_node ( state : State) -> dict :
# Analyze the content
if requires_review(state[ "content" ]):
# Interrupt and ask for human input
human_input = interrupt(
{
"question" : "Does this look correct?" ,
"content" : state[ "content" ],
"options" : [ "approve" , "reject" , "edit" ]
}
)
# This code runs AFTER human resumes
if human_input == "reject" :
return { "status" : "rejected" }
elif human_input == "edit" :
return { "content" : get_edited_content()}
return { "status" : "approved" }
builder.add_node( "review" , review_node)
Using interrupt()
The interrupt() function:
First call : Raises GraphInterrupt, pausing execution
Surfaces the interrupt value to the client
Resume with value : On next invocation, returns the resume value
Node re-executes from the start with the resume value
from langgraph.types import Command
config = { "configurable" : { "thread_id" : "review-1" }}
# First run - encounters interrupt
for event in graph.stream({ "content" : "Draft" }, config):
if "__interrupt__" in event:
interrupt_info = event[ "__interrupt__" ][ 0 ]
print (interrupt_info.value)
# {'question': 'Does this look correct?', ...}
# Get current state
state = graph.get_state(config)
interrupt = state.interrupts[ 0 ]
print (interrupt.value) # Same as above
# Resume with human decision
for event in graph.stream(
Command( resume = "approve" ), # Provide resume value
config
):
print (event)
# {'review': {'status': 'approved'}}
Multiple Interrupts
A node can have multiple interrupt points:
def complex_node ( state : State) -> dict :
# First interrupt
step1_approval = interrupt( "Approve step 1?" )
if step1_approval:
process_step1()
# Second interrupt
step2_approval = interrupt( "Approve step 2?" )
if step2_approval:
process_step2()
return { "status" : "complete" }
# Resume with list of values (matched in order)
for event in graph.stream(
Command( resume = [ True , True ]), # Approve both steps
config
):
print (event)
Resume values are matched to interrupts in the order they appear in the node’s execution.
Editing State During Interrupts
Modify state while paused:
config = { "configurable" : { "thread_id" : "edit-1" }}
# Run until interrupt
graph.invoke({ "draft" : "Initial content" }, config)
# Check state
state = graph.get_state(config)
print (state.values[ "draft" ])
# Human edits the draft
updated_config = graph.update_state(
config,
values = { "draft" : "Edited content" },
as_node = "editor" # Pretend 'editor' node made this change
)
# Resume with edited state
result = graph.invoke( None , updated_config)
Update and Resume in One Step
Combine state update with resuming:
from langgraph.types import Command
# Update state and resume interrupt
for event in graph.stream(
Command(
update = { "draft" : "Edited content" },
resume = "approve"
),
config
):
print (event)
Access detailed information about interrupts:
state = graph.get_state(config)
for interrupt in state.interrupts:
print (interrupt.value) # Data passed to interrupt()
print (interrupt.id) # Unique interrupt ID
# Use interrupt ID to resume specific interrupt
for event in graph.stream(
Command( resume = {interrupt.id: "approval_value" }),
config
):
print (event)
Approval Workflows
Simple Approval
def requires_approval ( state : State) -> dict :
if state[ "amount" ] > 1000 :
approval = interrupt( f "Approve $ { state[ 'amount' ] } transaction?" )
if not approval:
return { "status" : "rejected" }
return { "status" : "approved" }
Multi-Level Approval
def multi_level_approval ( state : State) -> dict :
amount = state[ "amount" ]
# Level 1: Manager approval for $1k+
if amount >= 1000 :
manager_approval = interrupt(
{ "level" : "manager" , "amount" : amount}
)
if not manager_approval:
return { "status" : "rejected" }
# Level 2: Director approval for $10k+
if amount >= 10000 :
director_approval = interrupt(
{ "level" : "director" , "amount" : amount}
)
if not director_approval:
return { "status" : "rejected" }
return { "status" : "approved" , "amount" : amount}
Review and Edit Workflows
Content Review
def content_reviewer ( state : State) -> dict :
draft = state[ "draft" ]
# Human reviews the draft
feedback = interrupt({
"action" : "review" ,
"draft" : draft,
"options" : [ "approve" , "request_changes" , "reject" ]
})
if feedback[ "action" ] == "approve" :
return { "status" : "approved" , "final" : draft}
elif feedback[ "action" ] == "request_changes" :
# Loop back for revisions
return {
"status" : "needs_revision" ,
"feedback" : feedback[ "comments" ]
}
else : # reject
return { "status" : "rejected" }
Interactive Editing
def interactive_editor ( state : State) -> dict :
content = state[ "content" ]
while True :
# Show content and get feedback
action = interrupt({
"content" : content,
"prompt" : "Edit, approve, or reject?"
})
if action[ "type" ] == "edit" :
content = action[ "new_content" ]
continue
elif action[ "type" ] == "approve" :
return { "content" : content, "status" : "approved" }
else : # reject
return { "status" : "rejected" }
Be cautious with loops in interrupt logic. Each iteration re-executes the node from the start.
Error Handling
Retry After Human Review
def safe_operation ( state : State) -> dict :
try :
result = risky_operation(state[ "input" ])
return { "result" : result}
except Exception as e:
# Let human decide whether to retry
action = interrupt({
"error" : str (e),
"question" : "Retry with different input?" ,
"current_input" : state[ "input" ]
})
if action[ "retry" ]:
# Try with modified input
result = risky_operation(action[ "new_input" ])
return { "result" : result, "input" : action[ "new_input" ]}
else :
raise
Building HITL UIs
CLI Interface
import uuid
config = { "configurable" : { "thread_id" : str (uuid.uuid4())}}
while True :
# Run until interrupt or completion
for event in graph.stream(user_input, config):
if "__interrupt__" in event:
# Display interrupt to user
interrupt_data = event[ "__interrupt__" ][ 0 ].value
print ( f " \n ⚠️ Approval Required:" )
print ( f " { interrupt_data } " )
# Get human input
response = input ( " \n Your decision (yes/no): " ).lower()
approval = response == "yes"
# Resume with decision
user_input = Command( resume = approval)
break
else :
print (event)
else :
# No interrupt, we're done
break
Web API
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ResumeRequest ( BaseModel ):
thread_id: str
resume_value: Any
@app.post ( "/start" )
async def start_workflow ( input_data : dict ):
thread_id = str (uuid.uuid4())
config = { "configurable" : { "thread_id" : thread_id}}
# Run until interrupt
result = graph.invoke(input_data, config)
# Check for interrupts
state = graph.get_state(config)
if state.interrupts:
return {
"status" : "interrupted" ,
"thread_id" : thread_id,
"interrupt" : state.interrupts[ 0 ].value,
"next" : state.next
}
return { "status" : "complete" , "result" : result}
@app.post ( "/resume" )
async def resume_workflow ( request : ResumeRequest):
config = { "configurable" : { "thread_id" : request.thread_id}}
# Resume with human input
result = graph.invoke(
Command( resume = request.resume_value),
config
)
# Check if more interrupts
state = graph.get_state(config)
if state.interrupts:
return {
"status" : "interrupted" ,
"interrupt" : state.interrupts[ 0 ].value
}
return { "status" : "complete" , "result" : result}
Best Practices
High-stakes decisions (financial, legal, medical)
Quality control and content moderation
Ambiguous situations requiring judgment
Initial system deployment (reduce automation gradually)
Compliance and audit requirements
Training and validation scenarios
Provide clear context in interrupt values
Include relevant state data
Offer specific action options
Set appropriate timeout expectations
Log all human decisions for audit
Make interrupt points resumable
Always use persistent checkpointers in production
Test resume logic thoroughly
Handle state updates gracefully
Validate human input before resuming
Consider versioning for long-running workflows
Show progress before and after interrupts
Provide clear instructions to humans
Display relevant context efficiently
Support undo/redo when possible
Track and display approval history
Troubleshooting
Interrupts not triggering
Verify checkpointer is configured
Check interrupt_before/after configuration
Ensure node names are correct
For dynamic interrupts, verify interrupt() is called
Confirm thread_id matches
Check Command(resume=…) syntax
Verify interrupt ID for multi-interrupt nodes
Review node re-execution logic
Use persistent checkpointer (not InMemorySaver in production)
Verify database connectivity
Check thread_id consistency
Review checkpoint write logs
Next Steps
Checkpointing Deep dive into state persistence for HITL
Streaming Stream events to build responsive HITL UIs