How SequentialWorkflow and ParallelWorkflow coordinate agents to complete tasks.
Workflows coordinate multiple agents to complete a task. The base class defines the contract; SequentialWorkflow and ParallelWorkflow provide two distinct execution strategies.
BaseWorkflow is an abstract class that holds an agents dictionary and an optional tools list. Subclasses must implement run().
from abc import ABC, abstractmethodfrom typing import Any, Dict, List, Optionalfrom agents.base_agent import BaseAgentclass BaseWorkflow(ABC): def __init__( self, agents: Dict[str, BaseAgent], tools: Optional[List[Any]] = None, workflow_name: str = "BaseWorkflow" ): self.agents = agents self.tools = tools or [] self.workflow_name = workflow_name def get_agent(self, name: str) -> BaseAgent: if name not in self.agents: raise ValueError(f"Agent '{name}' is not registered in this workflow.") return self.agents[name] @abstractmethod def run(self, task, **kwargs) -> Any: pass
get_agent(name) raises ValueError immediately if a required agent is missing, making misconfiguration errors easy to spot at startup rather than mid-run.
SequentialWorkflow implements the full Plan-Execute-Monitor loop. It requires three agents — "planner", "executor", and "monitor" — and processes steps one at a time.
run() accepts a single task string and an optional max_retries (default 2).
result = workflow.run(task="What is the capital of Andorra?", max_retries=2)if result["status"] == "success": for step_result in result["completed_results"]: print(f"Step: {step_result['step']}") print(f"Output: {step_result['result']}")else: print(f"Failed at step: {result['failed_step']}")
planner.generate_plan(task) returns a List[str] of step descriptions.
2
Execute each step
For each step, executor.execute_step(step, context=current_context) is called. The accumulated results of all previous steps are carried as context.
3
Compress context (if tools provided)
Before each executor call, if self.tools is non-empty, tools[0] is used to compress the context string. This keeps the context window manageable across many steps.
if current_context and getattr(self, 'tools', None): compressor = self.tools[0] if hasattr(compressor, 'invoke'): current_context = compressor.invoke(current_context) elif hasattr(compressor, '_run'): current_context = compressor._run(current_context)
4
Monitor the result
monitor.evaluate(objective=step, result=step_result) checks the output. On success, the result is appended to the running context and the workflow advances.
5
Retry or abort on failure
On failure, the feedback message is appended to context and the step is re-executed. After max_retries exhausted, the workflow returns {"status": "failed", "failed_step": ..., "completed_results": [...]}.
ParallelWorkflow runs multiple independent tasks concurrently using ThreadPoolExecutor. It only requires an "executor" agent — there is no planning or monitoring phase.
run() accepts a list of task strings (not a single string), and an optional max_workers (default 5).
tasks = [ "Summarize the history of Python", "Summarize the history of JavaScript", "Summarize the history of Rust",]result = workflow.run(tasks=tasks, max_workers=3)for task_description, output in result["results"].items(): print(f"Task: {task_description}") print(f"Result: {output}\n")
def run(self, tasks: List[str], max_workers: int = 5) -> Dict[str, Any]: executor_agent = self.get_agent("executor") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as thread_executor: future_to_task = { thread_executor.submit(executor_agent.execute_step, task): task for task in tasks } for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] try: results[task] = future.result() except Exception as exc: results[task] = f"ERROR: {str(exc)}" return {"status": "completed", "results": results}
Each task is submitted as an independent call to executor_agent.execute_step(task) — no context is shared between tasks. Exceptions per task are caught and stored as "ERROR: ..." strings so a single failed task does not abort the rest.
Always returns {"status": "completed", "results": {task_description: output, ...}}. Per-task errors are stored as "ERROR: <message>" values rather than raised.
Use when steps depend on each other. Each step’s output becomes context for the next. Includes planning, monitoring, and retry logic.Input: a single task: str
ParallelWorkflow
Use when tasks are independent of each other. All tasks run concurrently with no shared context. No planning or monitoring.Input: a list tasks: List[str]
SequentialWorkflow requires all three agents (planner, executor, monitor). ParallelWorkflow only requires executor. Passing the wrong agent dictionary raises AssertionError at construction time.