Skip to main content
A SequentialWorkflow coordinates three agents — planner, executor, and monitor — in a fixed order. The planner breaks the task into steps, then the executor and monitor iterate over each step in sequence. If a step fails, it retries up to max_retries times before aborting the pipeline.

Prerequisites

Install the project dependencies using uv or pip:
uv pip install -r pyproject.toml
Copy .env.example to .env and fill in your credentials before running.

Building a sequential workflow

1

Import agents and workflow

Import the three core agents and the SequentialWorkflow class:
from agents import PlanningAgent, ExecutionAgent, MonitoringAgent
from workflows import SequentialWorkflow
2

Initialize an LLM

Choose a local or cloud LLM. The agents accept any BaseChatModel-compatible instance:
import os
from langchain_ollama import ChatOllama

llm = ChatOllama(model=os.getenv("LOCAL_MODEL"))
3

Create the agents

Instantiate each agent with the LLM. The ExecutionAgent optionally accepts a list of tools:
from tools.curl_search_tool import CurlSearchTool

curl_tool = CurlSearchTool()

planner = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm, tools=[curl_tool])
monitor  = MonitoringAgent(llm=llm)
ExecutionAgent uses LangGraph’s create_react_agent to bind tools when tools is non-empty. Without tools the agent falls back to a plain LLM call.
4

Build the agent dictionary

SequentialWorkflow requires exactly three keys: "planner", "executor", and "monitor". The constructor asserts their presence at init time.
agent_dict = {
    "planner": planner,
    "executor": executor,
    "monitor": monitor,
}
5

Optionally attach tools to the workflow

Pass a list of tools to the workflow itself. The first tool in the list is used as a context compressor between steps. The workflow calls .invoke() if available, otherwise ._run().
from tools.compress_context_tool import CompressContextTool

compressor = CompressContextTool(max_length=10000)

sequential_workflow = SequentialWorkflow(
    agents=agent_dict,
    tools=[compressor, curl_tool],
)
Passing a CompressContextTool (or a LocalAgent compressor) as the first tool keeps accumulated context from blowing up the prompt as the workflow progresses through many steps.
6

Run the workflow

Call .run() with your task string. max_retries controls how many times a failing step is retried before the whole pipeline is aborted.
result = sequential_workflow.run(
    task="What is the capital of Andorra?",
    max_retries=2,
)
7

Interpret the result

The return value is a dictionary:
# Successful run
{
    "status": "success",
    "completed_results": [
        {"step": "Step 1: Research Andorra", "result": "Andorra la Vella is ..."},
        {"step": "Step 2: Confirm the capital", "result": "The capital is Andorra la Vella."},
    ]
}

# Failed run (after exhausting retries on step 2)
{
    "status": "failed",
    "failed_step": "Step 2: Confirm the capital",
    "completed_results": [
        {"step": "Step 1: Research Andorra", "result": "..."},
    ]
}
Iterate over completed_results to display each step’s output:
print(f"Status: {result['status']}")
for res in result.get('completed_results', []):
    print(f"\nStep: {res['step']}\nOutput:\n{res['result']}")

How retries work

For each step, the workflow runs an inner retry loop controlled by max_retries:
  1. The executor runs the step with the current accumulated context.
  2. The monitor evaluates the result against the step objective and returns {"success": bool, "feedback": str}.
  3. If success is False, the feedback string is appended to context and the executor is called again.
  4. After max_retries + 1 total attempts without success, run() returns status: "failed" immediately.
# From sequential_workflow.py — retry loop
while not success and attempts <= max_retries:
    if attempts > 0:
        current_context += f"\nPrevious attempt failed. Feedback: {feedback}"

    step_result = executor.execute_step(step, context=current_context)
    eval_data   = monitor.evaluate(objective=step, result=step_result)
    success     = eval_data.get("success", False)
    feedback    = eval_data.get("feedback", "")
    attempts   += 1

Context accumulation

Each successful step appends its result to a shared context string:
context += f"\n[Step {i} Result]: {step_result}"
This context is passed to the executor on the next step, giving it the full history of previous outputs. When a compressor tool is attached (tools[0]), the context is compressed before each execution call to keep prompt size manageable.

Full example

This mirrors the pattern from main.py:
import os
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from agents import PlanningAgent, ExecutionAgent, MonitoringAgent
from workflows import SequentialWorkflow
from tools.compress_context_tool import CompressContextTool
from tools.curl_search_tool import CurlSearchTool

load_dotenv()

llm = ChatOllama(model=os.getenv("LOCAL_MODEL"))

compressor = CompressContextTool(max_length=10000)
curl_tool  = CurlSearchTool()

planner  = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm, tools=[curl_tool])
monitor  = MonitoringAgent(llm=llm)

agent_dict = {"planner": planner, "executor": executor, "monitor": monitor}

sequential_workflow = SequentialWorkflow(
    agents=agent_dict,
    tools=[compressor, curl_tool],
)

try:
    workflow_result = sequential_workflow.run(task="What is the capital of Andorra?")
    print(f"Status: {workflow_result['status']}")
    for res in workflow_result.get('completed_results', []):
        print(f"\nStep: {res['step']}\nOutput:\n{res['result']}")
except Exception as e:
    print(f"Workflow failed: {e}")

Build docs developers (and LLMs) love