Skip to main content

Overview

Human-in-the-Loop (HITL) workflows pause execution to collect input from humans, then resume based on the response. Use cases include:
  • Approval workflows: Review and approve agent actions
  • Ambiguous decisions: Escalate to humans when AI is uncertain
  • Quality control: Human review before final output
  • Data collection: Gather additional context from users

How It Works

  1. Request: Executor calls ctx.request_info() with request data
  2. Pause: Workflow pauses and emits a request_info event
  3. Human Response: External system collects response
  4. Resume: Workflow resumes with response data
  5. Handle Response: Executor’s @response_handler processes the response
Human-in-the-loop workflow diagram

Basic Pattern

Request Information

Use ctx.request_info() to pause and request input:
Python
from agent_framework import Executor, WorkflowContext, handler

class ReviewGateway(Executor):
    @handler
    async def review_content(
        self,
        content: str,
        ctx: WorkflowContext
    ) -> None:
        # Pause and request human approval
        await ctx.request_info(
            request_data={
                "prompt": "Review this content. Reply 'approve' or provide feedback.",
                "content": content
            },
            response_type=str  # Expected response type
        )

Handle Response

Use @response_handler to process the human response:
Python
from agent_framework import response_handler

class ReviewGateway(Executor):
    @handler
    async def review_content(
        self,
        content: str,
        ctx: WorkflowContext
    ) -> None:
        await ctx.request_info(
            request_data={"prompt": "Approve?", "content": content},
            response_type=str
        )
    
    @response_handler
    async def on_human_response(
        self,
        original_request: dict,  # The request_data we sent
        response: str,           # The human's response
        ctx: WorkflowContext[str, str]
    ) -> None:
        if response.lower() == "approve":
            await ctx.yield_output(f"Approved: {original_request['content']}")
        else:
            # Send feedback for revision
            await ctx.send_message(f"Revise based on: {response}")

Running with Human Input

Interactive Workflow

Collect input during execution:
Python
workflow = create_workflow()

# Track pending requests
pending_requests = {}

async for event in workflow.run("initial input", stream=True):
    if event.type == "request_info":
        # Human input needed
        pending_requests[event.request_id] = event.data
        print(f"Request: {event.data['prompt']}")
        
        # Collect response from user
        response = input("Your response: ")
        
        # Resume with response
        responses = {event.request_id: response}
        async for resume_event in workflow.run(stream=True, responses=responses):
            if resume_event.type == "output":
                print(f"Output: {resume_event.data}")
                break

Pre-supplied Responses

Provide responses when resuming from checkpoint:
Python
# Load checkpoint with pending requests
checkpoint = await storage.load(checkpoint_id)

# Examine pending requests
for request_id, request_event in checkpoint.pending_request_info_events.items():
    print(f"Pending: {request_event.data}")

# Provide responses
responses = {
    "request-id-1": "approve",
    "request-id-2": "needs revision"
}

# Resume with responses
events = await workflow.run(
    checkpoint_id=checkpoint_id,
    responses=responses
)

Complete Example: Content Review Workflow

A workflow where an agent drafts content and a human reviews it:
import asyncio
import os
from dataclasses import dataclass
from agent_framework import (
    Executor,
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    Message,
    WorkflowBuilder,
    WorkflowContext,
    FileCheckpointStorage,
    handler,
    response_handler
)
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

@dataclass
class ApprovalRequest:
    prompt: str
    draft: str
    iteration: int

class ContentPreparer(Executor):
    def __init__(self, id: str, writer_id: str):
        super().__init__(id=id)
        self._writer_id = writer_id
    
    @handler
    async def prepare(
        self,
        topic: str,
        ctx: WorkflowContext[AgentExecutorRequest]
    ) -> None:
        ctx.set_state("topic", topic)
        prompt = f"Write a brief article about: {topic}"
        await ctx.send_message(
            AgentExecutorRequest(
                messages=[Message("user", text=prompt)],
                should_respond=True
            ),
            target_id=self._writer_id
        )

class ReviewGateway(Executor):
    def __init__(self, id: str, writer_id: str):
        super().__init__(id=id)
        self._writer_id = writer_id
        self._iteration = 0
    
    @handler
    async def review(
        self,
        response: AgentExecutorResponse,
        ctx: WorkflowContext
    ) -> None:
        self._iteration += 1
        draft = response.agent_response.text
        
        # Request human approval
        await ctx.request_info(
            request_data=ApprovalRequest(
                prompt="Review draft. Reply 'approve' or provide feedback.",
                draft=draft,
                iteration=self._iteration
            ),
            response_type=str
        )
    
    @response_handler
    async def on_human_feedback(
        self,
        original_request: ApprovalRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest | str, str]
    ) -> None:
        if feedback.strip().lower() == "approve":
            await ctx.yield_output(original_request.draft)
        else:
            # Send revision request
            prompt = (
                f"Revise this draft based on feedback:\n"
                f"Draft: {original_request.draft}\n"
                f"Feedback: {feedback}"
            )
            await ctx.send_message(
                AgentExecutorRequest(
                    messages=[Message("user", text=prompt)],
                    should_respond=True
                ),
                target_id=self._writer_id
            )
    
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"iteration": self._iteration}
    
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._iteration = state.get("iteration", 0)

async def main():
    client = AzureOpenAIResponsesClient(
        project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
        credential=AzureCliCredential()
    )
    
    writer = client.as_agent(
        name="writer",
        instructions="Write concise, engaging articles."
    )
    
    storage = FileCheckpointStorage("./checkpoints")
    writer_executor = AgentExecutor(writer)
    preparer = ContentPreparer(id="preparer", writer_id="writer")
    reviewer = ReviewGateway(id="reviewer", writer_id="writer")
    
    workflow = (
        WorkflowBuilder(
            start_executor=preparer,
            checkpoint_storage=storage,
            max_iterations=10
        )
        .add_edge(preparer, writer_executor)
        .add_edge(writer_executor, reviewer)
        .add_edge(reviewer, writer_executor)  # Revision loop
        .build()
    )
    
    # Interactive session
    pending_requests = {}
    responses = None
    
    while True:
        if responses:
            events_stream = workflow.run(stream=True, responses=responses)
            pending_requests.clear()
            responses = None
        else:
            events_stream = workflow.run(
                message="electric vehicles",
                stream=True
            )
        
        async for event in events_stream:
            if event.type == "output":
                print(f"\nFinal output: {event.data}")
                return
            
            if event.type == "request_info":
                request = event.data
                pending_requests[event.request_id] = request
                print(f"\n{'='*60}")
                print(f"Iteration {request.iteration}")
                print(f"{request.prompt}")
                print(f"\nDraft:\n{request.draft}")
                print(f"{'='*60}")
        
        # Collect responses
        if pending_requests:
            responses = {}
            for req_id, request in pending_requests.items():
                feedback = input("Your response (or 'approve'): ").strip()
                responses[req_id] = feedback

if __name__ == "__main__":
    asyncio.run(main())

Checkpoint Integration

Combine HITL with checkpoints for durable pause/resume:
Python
import asyncio
from pathlib import Path

storage = FileCheckpointStorage(Path("./checkpoints"))

async def run_with_pause_resume():
    workflow = create_workflow(checkpoint_storage=storage)
    
    # Run until human input needed
    async for event in workflow.run("initial input", stream=True):
        if event.type == "request_info":
            print(f"\nWorkflow paused. Request ID: {event.request_id}")
            print(f"Request: {event.data}")
            break
    
    # Program can exit here. Checkpoint is saved.
    print("\nExiting. Run this script again to resume.")
    
    # Later (in same or different process):
    latest = await storage.get_latest(workflow_name=workflow.name)
    if latest and latest.pending_request_info_events:
        print("\nResuming from checkpoint...")
        
        # Collect responses for pending requests
        responses = {}
        for req_id, request_event in latest.pending_request_info_events.items():
            response = input(f"Response for {request_event.data}: ")
            responses[req_id] = response
        
        # Resume with responses
        events = await workflow.run(
            checkpoint_id=latest.checkpoint_id,
            responses=responses
        )
        
        outputs = events.get_outputs()
        print(f"Final output: {outputs}")

Multiple Concurrent Requests

Handle multiple request-info calls in parallel:
Python
class ParallelReviewGateway(Executor):
    def __init__(self, id: str, reviewers: list[str]):
        super().__init__(id=id)
        self._reviewers = reviewers
    
    @handler
    async def request_reviews(
        self,
        content: str,
        ctx: WorkflowContext
    ) -> None:
        # Request approval from multiple reviewers
        for reviewer in self._reviewers:
            await ctx.request_info(
                request_data={
                    "reviewer": reviewer,
                    "content": content,
                    "prompt": f"Review for {reviewer}"
                },
                response_type=str
            )
    
    @response_handler
    async def on_reviewer_response(
        self,
        original_request: dict,
        response: str,
        ctx: WorkflowContext[Never, str]
    ) -> None:
        # Track responses
        responses = ctx.get_state("reviewer_responses", {})
        responses[original_request["reviewer"]] = response
        ctx.set_state("reviewer_responses", responses)
        
        # When all reviewers respond, yield result
        if len(responses) == len(self._reviewers):
            all_approved = all(
                r.lower() == "approve" 
                for r in responses.values()
            )
            
            if all_approved:
                await ctx.yield_output("All reviewers approved")
            else:
                await ctx.yield_output(f"Feedback: {responses}")

# Usage
gateway = ParallelReviewGateway(
    id="gateway",
    reviewers=["alice", "bob", "charlie"]
)

# When running, provide all responses
responses = {
    "request-id-alice": "approve",
    "request-id-bob": "looks good",
    "request-id-charlie": "approve"
}

events = await workflow.run(responses=responses)

Typed Request-Response

Use dataclasses for type-safe request-response:
Python
from dataclasses import dataclass
from pydantic import BaseModel

# Define request structure
@dataclass
class ClassificationRequest:
    item_id: str
    description: str
    categories: list[str]

# Define response structure
class ClassificationResponse(BaseModel):
    category: str
    confidence: float
    notes: str

class Classifier(Executor):
    @handler
    async def classify(
        self,
        item: dict,
        ctx: WorkflowContext
    ) -> None:
        await ctx.request_info(
            request_data=ClassificationRequest(
                item_id=item["id"],
                description=item["description"],
                categories=["A", "B", "C"]
            ),
            response_type=ClassificationResponse
        )
    
    @response_handler
    async def on_classification(
        self,
        original_request: ClassificationRequest,
        response: ClassificationResponse,
        ctx: WorkflowContext[Never, dict]
    ) -> None:
        result = {
            "item_id": original_request.item_id,
            "category": response.category,
            "confidence": response.confidence
        }
        await ctx.yield_output(result)

Best Practices

  • Use clear, specific prompts in request data
  • Include context needed for human decision
  • Use structured data (dataclasses, Pydantic models) for type safety
  • Provide sensible defaults when possible
  • Validate responses before processing
  • Handle unexpected responses gracefully
  • Provide feedback to user about what happens next
  • Log all request-response pairs for audit
  • Always use checkpoint storage for HITL workflows
  • Test checkpoint restore with pending requests
  • Document what happens if response is never provided
  • Implement timeouts for critical workflows
  • Show clear request IDs for tracking
  • Display relevant context with each request
  • Support bulk response collection
  • Provide status indicators (pending, responded, processed)

Advanced Patterns

Escalation Pattern

Escalate to human only when AI is uncertain:
Python
class SmartReviewer(Executor):
    def __init__(self, id: str, confidence_threshold: float = 0.8):
        super().__init__(id=id)
        self._threshold = confidence_threshold
    
    @handler
    async def review(
        self,
        result: dict,
        ctx: WorkflowContext[str]
    ) -> None:
        confidence = result.get("confidence", 0)
        
        if confidence >= self._threshold:
            # Auto-approve high confidence
            await ctx.yield_output(f"Auto-approved: {result}")
        else:
            # Escalate to human
            await ctx.request_info(
                request_data={
                    "prompt": f"Low confidence ({confidence}). Please review.",
                    "result": result
                },
                response_type=str
            )
    
    @response_handler
    async def on_human_review(
        self,
        original_request: dict,
        decision: str,
        ctx: WorkflowContext[Never, str]
    ) -> None:
        await ctx.yield_output(f"Human decision: {decision}")

Tool Approval Pattern

Require approval before executing sensitive tools:
Python
class ToolApprovalGateway(Executor):
    @handler
    async def on_tool_call(
        self,
        tool_call: dict,
        ctx: WorkflowContext
    ) -> None:
        tool_name = tool_call.get("name")
        
        if tool_name in ["delete_data", "send_email", "make_payment"]:
            # Require approval for sensitive tools
            await ctx.request_info(
                request_data={
                    "prompt": f"Approve execution of {tool_name}?",
                    "tool": tool_name,
                    "args": tool_call.get("arguments")
                },
                response_type=bool
            )
        else:
            # Auto-approve safe tools
            await ctx.send_message(tool_call)
    
    @response_handler
    async def on_approval(
        self,
        original_request: dict,
        approved: bool,
        ctx: WorkflowContext[dict, str]
    ) -> None:
        if approved:
            await ctx.send_message(original_request)
        else:
            await ctx.yield_output(f"Tool {original_request['tool']} rejected")

Next Steps

Checkpoints

Learn more about checkpoint integration with HITL

Orchestration Patterns

Explore workflow orchestration patterns

Build docs developers (and LLMs) love