Skip to main content
LangGraphOrchestrator is a production-grade alternative to SequentialWorkflow. Rather than a plain Python loop, it models the planner → executor → monitor pipeline as a LangGraph state machine, giving you full visibility into state transitions and clean retry semantics.

The state machine

The graph has three nodes connected by edges:
planner_node → executor_node → monitor_node
                     ↑                |
                     |   (retry)      |
                     └────────────────┘
                                      |
                              (next_step / end)
NodeWhat it does
planner_nodeCalls PlanningAgent.generate_plan() to produce a list of step strings
executor_nodeCalls ExecutionAgent.execute_step() on the current step, optionally compressing context first
monitor_nodeCalls MonitoringAgent.evaluate() and decides whether to advance, retry, or abort
After monitor_node, a conditional edge (_route_after_monitor) chooses one of three routes:
  • next_step — step succeeded, advance current_step_index and loop back to executor_node
  • retry — step failed but retries remain, loop back to executor_node with failure feedback appended to context
  • end — all steps complete or max retries exceeded, exit the graph

Building the orchestrator

1

Import agents and orchestrator

from agents import PlanningAgent, ExecutionAgent, MonitoringAgent
from orchestators import LangGraphOrchestrator
2

Initialize the LLM and create agents

Different agents can use different LLMs. In main.py, the monitor uses a cloud model while the planner and executor use a local one:
import os
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI

load_dotenv()

llm = ChatOllama(model=os.getenv("LOCAL_MODEL"))

llm_cloud = ChatOpenAI(
    model_name=os.getenv("SUMMARY_MODEL"),
    temperature=0.3,
    openai_api_key=os.getenv("SUMMARY_AGENT_API_KEY"),
    base_url=os.getenv("SUMMARY_HOST"),
)

planner  = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm, tools=[curl_tool])
monitor  = MonitoringAgent(llm=llm_cloud)  # cloud model for stricter evaluation
3

Choose a context compressor (optional)

The compressor runs on accumulated context before each executor_node call. Two options are available:
from agents.local_agent import LocalAgent

context_compressor_agent = LocalAgent(llm=llm)
LocalAgent uses the local LLM to summarise context aggressively. It is preferred when Ollama is running, because it produces semantically dense summaries.
The fallback pattern from main.py: try to create a LocalAgent, catch ImportError, and fall back to CompressContextTool. See the local vs cloud LLM guide for the full pattern.
4

Instantiate the orchestrator

orchestrator = LangGraphOrchestrator(
    planner=planner,
    executor=executor,
    monitor=monitor,
    compressor=context_compressor_agent,  # None to disable compression
    max_retries=2,
)
The graph is compiled once in __init__ via _build_graph() and reused across calls to .run().
5

Run the orchestrator

result = orchestrator.run(task="What is the capital of Andorra?")
6

Interpret the result

The return value is the full OrchestratorState dictionary:
{
    "status": "success",   # or "failed"
    "plan": [
        "Step 1: Research Andorra's geography",
        "Step 2: Identify and confirm the capital city",
    ],
    "results": [
        {
            "step": "Step 1: Research Andorra's geography",
            "result": "Andorra is a small principality ...",
            "status": "validated",   # "failed" for unsuccessful steps
        },
        {
            "step": "Step 2: Identify and confirm the capital city",
            "result": "The capital of Andorra is Andorra la Vella.",
            "status": "validated",
        },
    ],
    "current_step_index": 2,
    "attempts": 0,
    ...
}
Only steps with status == "validated" have a usable result. Steps marked "failed" were rejected by the monitor.
print(f"Status: {result['status']}")
print(f"Plan steps: {len(result['plan'])}")

for res in result.get('results', []):
    if res.get('status') == 'validated' and result['status'] == 'success':
        print(f"\nStep: {res['step']}\nOutput:\n{res['result']}")
7

Save the result to a file

From main.py, the full orchestrator state is serialised to result.txt:
import json

with open("result.txt", "w", encoding="utf-8") as f:
    json.dump(result, f, indent=4, ensure_ascii=False)

Retry behaviour

The attempts counter in OrchestratorState tracks failed attempts for the current step. It is reset to 0 every time a step succeeds.
# From langgraph_orchestrator.py — monitor node on failure
attempts = state.get("attempts", 0) + 1
new_context = state.get("context", "") + f"\n[Failed Attempt Feedback]: {feedback}"

if attempts > self.max_retries:
    return {"status": "failed"}  # routes to END

return {"attempts": attempts, "context": new_context, "status": "executing"}  # routes to retry

Context compression

Before each call to executor_node, the compressor runs on the accumulated context string:
# From langgraph_orchestrator.py — executor node
if context and self.compressor:
    if hasattr(self.compressor, 'invoke'):
        context = self.compressor.invoke(context)
    elif hasattr(self.compressor, '_run'):
        context = self.compressor._run(context)

result = self.executor.execute_step(current_step, context=context)
The orchestrator accepts any object with an invoke or _run method as a compressor — both LocalAgent and CompressContextTool satisfy this interface.

Full example

import os
import json
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from agents import PlanningAgent, ExecutionAgent, MonitoringAgent
from orchestators import LangGraphOrchestrator
from tools.compress_context_tool import CompressContextTool
from tools.curl_search_tool import CurlSearchTool

load_dotenv()

llm      = ChatOllama(model=os.getenv("LOCAL_MODEL"))
curl_tool = CurlSearchTool()

planner  = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm, tools=[curl_tool])
monitor  = MonitoringAgent(llm=llm)

compressor = CompressContextTool(max_length=10000)

orchestrator = LangGraphOrchestrator(
    planner=planner,
    executor=executor,
    monitor=monitor,
    compressor=compressor,
    max_retries=2,
)

try:
    result = orchestrator.run(task="What is the capital of Andorra?")
    print(f"Status: {result['status']}")
    print(f"Plan steps: {len(result['plan'])}")

    with open("result.txt", "w", encoding="utf-8") as f:
        json.dump(result, f, indent=4, ensure_ascii=False)

    for res in result.get('results', []):
        if res.get('status') == 'validated' and result['status'] == 'success':
            print(f"\nStep: {res['step']}\nOutput:\n{res['result']}")
except Exception as e:
    print(f"Orchestrator failed: {e}")

Build docs developers (and LLMs) love