Skip to main content
Workflows coordinate multiple agents to complete a task. The base class defines the contract; SequentialWorkflow and ParallelWorkflow provide two distinct execution strategies.

BaseWorkflow

BaseWorkflow is an abstract class that holds an agents dictionary and an optional tools list. Subclasses must implement run().
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from agents.base_agent import BaseAgent

class BaseWorkflow(ABC):
    def __init__(
        self,
        agents: Dict[str, BaseAgent],
        tools: Optional[List[Any]] = None,
        workflow_name: str = "BaseWorkflow"
    ):
        self.agents = agents
        self.tools = tools or []
        self.workflow_name = workflow_name

    def get_agent(self, name: str) -> BaseAgent:
        if name not in self.agents:
            raise ValueError(f"Agent '{name}' is not registered in this workflow.")
        return self.agents[name]

    @abstractmethod
    def run(self, task, **kwargs) -> Any:
        pass
get_agent(name) raises ValueError immediately if a required agent is missing, making misconfiguration errors easy to spot at startup rather than mid-run.

SequentialWorkflow

SequentialWorkflow implements the full Plan-Execute-Monitor loop. It requires three agents — "planner", "executor", and "monitor" — and processes steps one at a time.

Setup

from langchain_ollama import ChatOllama
from agents.planning_agent import PlanningAgent
from agents.execution_agent import ExecutionAgent
from agents.monitoring_agent import MonitoringAgent
from workflows.sequential_workflow import SequentialWorkflow
from tools.curl_search_tool import CurlSearchTool
from tools.compress_context_tool import CompressContextTool

llm = ChatOllama(model="llama3")

planner = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm, tools=[CurlSearchTool()])
monitor = MonitoringAgent(llm=llm)

agent_dict = {
    "planner": planner,
    "executor": executor,
    "monitor": monitor,
}

workflow = SequentialWorkflow(
    agents=agent_dict,
    tools=[CompressContextTool(max_length=10000)]
)

Running

run() accepts a single task string and an optional max_retries (default 2).
result = workflow.run(task="What is the capital of Andorra?", max_retries=2)

if result["status"] == "success":
    for step_result in result["completed_results"]:
        print(f"Step: {step_result['step']}")
        print(f"Output: {step_result['result']}")
else:
    print(f"Failed at step: {result['failed_step']}")

How it works

1

Generate plan

planner.generate_plan(task) returns a List[str] of step descriptions.
2

Execute each step

For each step, executor.execute_step(step, context=current_context) is called. The accumulated results of all previous steps are carried as context.
3

Compress context (if tools provided)

Before each executor call, if self.tools is non-empty, tools[0] is used to compress the context string. This keeps the context window manageable across many steps.
if current_context and getattr(self, 'tools', None):
    compressor = self.tools[0]
    if hasattr(compressor, 'invoke'):
        current_context = compressor.invoke(current_context)
    elif hasattr(compressor, '_run'):
        current_context = compressor._run(current_context)
4

Monitor the result

monitor.evaluate(objective=step, result=step_result) checks the output. On success, the result is appended to the running context and the workflow advances.
5

Retry or abort on failure

On failure, the feedback message is appended to context and the step is re-executed. After max_retries exhausted, the workflow returns {"status": "failed", "failed_step": ..., "completed_results": [...]}.

Return value

StatusKeys present
"success"completed_results: List[{step, result}]
"failed"failed_step: str, completed_results: List[{step, result}]

ParallelWorkflow

ParallelWorkflow runs multiple independent tasks concurrently using ThreadPoolExecutor. It only requires an "executor" agent — there is no planning or monitoring phase.

Setup

from workflows.parallel_workflow import ParallelWorkflow

workflow = ParallelWorkflow(
    agents={"executor": ExecutionAgent(llm=llm)}
)

Running

run() accepts a list of task strings (not a single string), and an optional max_workers (default 5).
tasks = [
    "Summarize the history of Python",
    "Summarize the history of JavaScript",
    "Summarize the history of Rust",
]

result = workflow.run(tasks=tasks, max_workers=3)

for task_description, output in result["results"].items():
    print(f"Task: {task_description}")
    print(f"Result: {output}\n")

How it works

def run(self, tasks: List[str], max_workers: int = 5) -> Dict[str, Any]:
    executor_agent = self.get_agent("executor")

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as thread_executor:
        future_to_task = {
            thread_executor.submit(executor_agent.execute_step, task): task
            for task in tasks
        }
        for future in concurrent.futures.as_completed(future_to_task):
            task = future_to_task[future]
            try:
                results[task] = future.result()
            except Exception as exc:
                results[task] = f"ERROR: {str(exc)}"

    return {"status": "completed", "results": results}
Each task is submitted as an independent call to executor_agent.execute_step(task) — no context is shared between tasks. Exceptions per task are caught and stored as "ERROR: ..." strings so a single failed task does not abort the rest.

Return value

Always returns {"status": "completed", "results": {task_description: output, ...}}. Per-task errors are stored as "ERROR: <message>" values rather than raised.

Choosing between sequential and parallel

SequentialWorkflow

Use when steps depend on each other. Each step’s output becomes context for the next. Includes planning, monitoring, and retry logic.Input: a single task: str

ParallelWorkflow

Use when tasks are independent of each other. All tasks run concurrently with no shared context. No planning or monitoring.Input: a list tasks: List[str]
SequentialWorkflow requires all three agents (planner, executor, monitor). ParallelWorkflow only requires executor. Passing the wrong agent dictionary raises AssertionError at construction time.

Build docs developers (and LLMs) love