Skip to main content
ParallelWorkflow extends BaseWorkflow to implement the parallelization pattern. All tasks in the input list are dispatched simultaneously through a ThreadPoolExecutor, each handled by the same ExecutionAgent. Individual task failures are captured as error strings and do not abort the remaining tasks.

Import

from workflows import ParallelWorkflow

Constructor

agents
Dict[str, Any]
required
Dictionary of agent instances. Must contain the key "executor" mapped to an ExecutionAgent.
An AssertionError is raised at construction time if "executor" is missing from agents.
tools
List[Any]
default:"None"
Optional list of tools available to the workflow. Not used internally by the default run implementation, but accessible via self.tools for subclasses.

Methods

run

Dispatches all tasks concurrently and collects their results.
def run(self, tasks: List[str], max_workers: int = 5) -> Dict[str, Any]
tasks
List[str]
required
A list of independent task description strings. Each is executed in isolation — no shared state is passed between concurrent executions.
max_workers
int
default:"5"
Maximum number of threads in the ThreadPoolExecutor. Limits the degree of parallelism.

Return value

status
string
Always "completed" — the workflow returns after all futures resolve regardless of individual task outcomes.
results
Dict[str, str]
A flat dictionary mapping each original task string to its result string.
Tasks must be fully independent. The ExecutionAgent is shared across threads; avoid designs where tasks mutate shared state or depend on each other’s output.

Usage Example

from workflows import ParallelWorkflow
from agents.execution_agent import ExecutionAgent

executor = ExecutionAgent(llm=llm)

workflow = ParallelWorkflow(
    agents={"executor": executor},
)

tasks = [
    "Summarize the history of the Roman Empire",
    "List the top 5 programming languages in 2024",
    "Explain the basics of photosynthesis",
    "Describe the water cycle",
]

result = workflow.run(tasks, max_workers=4)

print(f"Status: {result['status']}")
for task, output in result["results"].items():
    print(f"\nTask: {task}")
    print(f"Output: {output}")

Build docs developers (and LLMs) love