Skip to main content
SequentialWorkflow extends BaseWorkflow to implement the sequential orchestration pattern. A PlanningAgent decomposes the task into steps; each step is then executed by an ExecutionAgent and validated by a MonitoringAgent before the next step begins. Failed steps are retried up to max_retries times before the pipeline aborts.

Import

from workflows import SequentialWorkflow

Constructor

agents
Dict[str, Any]
required
Dictionary of agent instances. Must contain all three of the following keys:
KeyExpected type
"planner"PlanningAgent
"executor"ExecutionAgent
"monitor"MonitoringAgent
An AssertionError is raised at construction time if any of "planner", "executor", or "monitor" are missing from agents.
tools
List[Any]
default:"None"
Optional list of tools. When provided, tools[0] is used as a context compressor before each step execution. The first tool must expose either an invoke(text) or _run(text) method (e.g. CompressContextTool).

Methods

run

Executes the full sequential plan–execute–monitor loop.
def run(self, task: str, max_retries: int = 2) -> Dict[str, Any]
task
str
required
The high-level objective. Passed directly to PlanningAgent.generate_plan().
max_retries
int
default:"2"
Maximum number of retry attempts per step before the workflow aborts. The step is tried max_retries + 1 times total.

Return value

On success — all steps validated:
status
string
Indicates the workflow completed without any unrecoverable step failure.
completed_results
List[Dict]
Ordered list of step results.
On failure — a step exhausted all retries:
status
string
Indicates the workflow was aborted due to a step that could not be validated.
failed_step
str
The step description string that caused the abort.
completed_results
List[Dict]
Results for all steps that completed successfully before the failure.

Context accumulation

After each successful step, its result is appended to a running context string as "\n[Step N Result]: <result>". This accumulated context is passed to the executor on every subsequent step.If tools is non-empty, tools[0] is invoked to compress the accumulated context before each execution call, keeping prompt size manageable.

Usage Example

from workflows import SequentialWorkflow
from agents.planning_agent import PlanningAgent
from agents.execution_agent import ExecutionAgent
from agents.monitoring_agent import MonitoringAgent
from tools.compress_context_tool import CompressContextTool

planner  = PlanningAgent(llm=llm)
executor = ExecutionAgent(llm=llm)
monitor  = MonitoringAgent(llm=llm)
compressor = CompressContextTool(max_length=3000)

workflow = SequentialWorkflow(
    agents={
        "planner":  planner,
        "executor": executor,
        "monitor":  monitor,
    },
    tools=[compressor],
)

result = workflow.run(
    task="Research and summarize recent advances in quantum computing",
    max_retries=2,
)

if result["status"] == "success":
    for item in result["completed_results"]:
        print(f"Step: {item['step']}")
        print(f"Result: {item['result']}\n")
else:
    print(f"Workflow failed at step: {result['failed_step']}")

Build docs developers (and LLMs) love