Skip to main content
LangGraphOrchestrator wires a PlanningAgent, ExecutionAgent, and MonitoringAgent into a compiled LangGraph state machine. Each node in the graph corresponds to one agent; a conditional router after the monitor node decides whether to advance to the next step, retry the current step, or terminate.

Import

from orchestators import LangGraphOrchestrator

Constructor

planner
PlanningAgent
required
Responsible for decomposing the input task into an ordered list of step strings via generate_plan(task).
executor
ExecutionAgent
required
Executes a single step given the step description and accumulated context via execute_step(step, context=...).
monitor
MonitoringAgent
required
Evaluates an execution result against the step objective via evaluate(objective, result). Must return a dict with "success" (bool) and "feedback" (str) keys.
compressor
Any
default:"None"
Optional context compressor. Called before each executor invocation when accumulated context is non-empty. Must expose either an invoke(text) or _run(text) method. Typically a CompressContextTool or a local agent.
max_retries
int
default:"2"
Maximum number of retry attempts for a single step before the orchestrator sets status to "failed" and terminates.

State Machine

OrchestratorState fields

The full state dictionary passed between every node:
FieldTypeDescription
taskstrThe original task string
planList[str]Steps generated by the planner
current_step_indexintIndex into plan for the active step
contextstrAccumulated results from completed steps
resultsList[Dict[str, Any]]Result records (append-only via operator.add)
attemptsintRetry counter for the current step
max_retriesintCopy of the constructor value
statusstr"planning""executing""success" | "failed"

Nodes

NodeRole
planner_nodeCalls generate_plan, sets plan and resets current_step_index to 0
executor_nodeRuns the current step, appends a "pending_validation" result entry
monitor_nodeValidates the latest result; updates status, context, and attempts

Routing after monitor_node

Route keyConditionDestination
"next_step"Step validated; more steps remainexecutor_node
"retry"Step failed; attempts ≤ max_retriesexecutor_node
"end"All steps done or status == "failed"END

Methods

run

Invokes the compiled LangGraph state machine and returns the final state.
def run(self, task: str) -> Dict[str, Any]
task
str
required
The high-level task or question to orchestrate.

Return value

The returned dictionary is the final OrchestratorState. Key fields:
status
str
Final workflow status. One of "success", "failed", or "executing" (if the graph terminated unexpectedly).
plan
List[str]
The full ordered list of steps generated by the planner.
results
List[Dict[str, Any]]
All result records accumulated during execution.
context
str
The accumulated context string built from all validated step results.
current_step_index
int
The index of the last step that was processed.
attempts
int
The retry counter at the time the graph terminated.
max_retries
int
The configured maximum retries value.

Usage Example

from orchestators import LangGraphOrchestrator
from agents.planning_agent import PlanningAgent
from agents.execution_agent import ExecutionAgent
from agents.monitoring_agent import MonitoringAgent
from tools.compress_context_tool import CompressContextTool

planner    = PlanningAgent(llm=llm)
executor   = ExecutionAgent(llm=llm)
monitor    = MonitoringAgent(llm=llm)
compressor = CompressContextTool(max_length=3000)

orchestrator = LangGraphOrchestrator(
    planner=planner,
    executor=executor,
    monitor=monitor,
    compressor=compressor,
    max_retries=3,
)

final_state = orchestrator.run(
    "Write a technical blog post about the benefits of async programming"
)

print(f"Status: {final_state['status']}")
for record in final_state["results"]:
    print(f"[{record['status']}] {record['step']}")
    print(f"  => {record['result']}\n")

Build docs developers (and LLMs) love