Skip to main content
LangGraphOrchestrator is an alternative to SequentialWorkflow that uses LangGraph’s StateGraph to wire planning, execution, and monitoring as explicit graph nodes. This provides durable, inspectable state rather than plain Python control flow.

OrchestratorState

All data passed between nodes lives in a TypedDict:
from typing import Annotated, Any, Dict, List, TypedDict
import operator

class OrchestratorState(TypedDict):
    task: str                                          # The original user task
    plan: List[str]                                    # Steps generated by PlanningAgent
    current_step_index: int                            # Which step is being executed
    context: str                                       # Accumulated step results
    results: Annotated[List[Dict[str, Any]], operator.add]  # Append-only result list
    attempts: int                                      # Retry count for current step
    max_retries: int                                   # Configured retry limit
    status: str                                        # planning | executing | monitoring | success | failed
results uses Annotated[..., operator.add] — LangGraph merges partial state updates by appending to this list rather than replacing it.

Building the graph

from langgraph.graph import StateGraph, END
from agents.planning_agent import PlanningAgent
from agents.execution_agent import ExecutionAgent
from agents.monitoring_agent import MonitoringAgent
from orchestators.langgraph_orchestrator import LangGraphOrchestrator

llm = ChatOllama(model="llama3")

orchestrator = LangGraphOrchestrator(
    planner=PlanningAgent(llm=llm),
    executor=ExecutionAgent(llm=llm),
    monitor=MonitoringAgent(llm=llm),
    compressor=None,   # or CompressContextTool() / LocalAgent()
    max_retries=2
)
Inside _build_graph(), the graph is wired as:
def _build_graph(self):
    workflow = StateGraph(OrchestratorState)

    workflow.add_node("planner_node",  self._node_planner)
    workflow.add_node("executor_node", self._node_executor)
    workflow.add_node("monitor_node",  self._node_monitor)

    workflow.set_entry_point("planner_node")

    workflow.add_edge("planner_node",  "executor_node")
    workflow.add_edge("executor_node", "monitor_node")

    workflow.add_conditional_edges(
        "monitor_node",
        self._route_after_monitor,
        {
            "next_step": "executor_node",
            "retry":     "executor_node",
            "end":       END
        }
    )

    return workflow.compile()

State flow

1

planner_node

Calls self.planner.generate_plan(state["task"]) and sets plan, current_step_index = 0, and status = "executing" in the returned state patch.
2

executor_node

Reads plan[current_step_index]. If a compressor is configured, compresses the accumulated context before passing it to executor.execute_step(). Appends a {step, result, status: "pending_validation"} dict to results.
if context and self.compressor:
    if hasattr(self.compressor, 'invoke'):
        context = self.compressor.invoke(context)
    elif hasattr(self.compressor, '_run'):
        context = self.compressor._run(context)
3

monitor_node

Reads results[-1] and calls monitor.evaluate(current_step, actual_result). On success, increments current_step_index, resets attempts to 0, and marks the result as "validated". On failure, increments attempts and appends feedback to context.
4

_route_after_monitor (conditional edge)

Decides what happens next:
  • "end"status == "failed" or all steps completed (current_step_index >= len(plan))
  • "retry"attempts > 0 and not yet failed
  • "next_step" — step succeeded and more steps remain
Both "retry" and "next_step" route back to executor_node.

Retry logic

When a step fails, the monitor node:
  1. Increments attempts
  2. Appends "\n[Failed Attempt Feedback]: {feedback}" to context
  3. Returns {"attempts": attempts, "context": new_context, "status": "executing"}
The router sees attempts > 0 and routes back to executor_node. If attempts > max_retries, the monitor returns {"status": "failed"} and the router terminates with "end".
def _node_monitor(self, state):
    ...
    if not success:
        attempts = state.get("attempts", 0) + 1
        if attempts > self.max_retries:
            return {"status": "failed"}
        return {
            "attempts": attempts,
            "context": state["context"] + f"\n[Failed Attempt Feedback]: {feedback}",
            "status": "executing"
        }

Running the orchestrator

result = orchestrator.run(task="What is the capital of Andorra?")

print(result["status"])   # "success" or "failed"
print(result["plan"])     # List of generated steps

for r in result["results"]:
    if r["status"] == "validated":
        print(f"Step: {r['step']}")
        print(f"Output: {r['result']}")
run() builds the initial state and calls self.graph.invoke(initial_state), which runs the compiled StateGraph to completion and returns the final OrchestratorState.
def run(self, task: str) -> Dict[str, Any]:
    initial_state = {
        "task": task,
        "plan": [],
        "current_step_index": 0,
        "context": "",
        "results": [],
        "attempts": 0,
        "max_retries": self.max_retries,
        "status": "planning"
    }
    return self.graph.invoke(initial_state)

LangGraphOrchestrator vs SequentialWorkflow

LangGraphOrchestrator

State is an explicit TypedDict passed through graph nodes. LangGraph manages transitions, enabling inspection, checkpointing, and replay. Context compression is built in via the compressor parameter.

SequentialWorkflow

State is implicit Python variables (context, results, attempts). Simpler setup with no LangGraph dependency. Compression is driven by tools[0] if present.
Both approaches implement the same Plan-Execute-Monitor loop. Prefer LangGraphOrchestrator when you need durable execution, state inspection, or plan to add branching logic. Use SequentialWorkflow for simpler setups where plain Python control flow is sufficient.

Build docs developers (and LLMs) love