Skip to main content
ParallelWorkflow dispatches a list of independent tasks to an ExecutionAgent concurrently using Python’s ThreadPoolExecutor. Use it when tasks have no dependencies on each other and can be safely computed at the same time.

When to use parallel execution

Good fit

Tasks that are fully independent — querying multiple data sources, generating summaries for separate documents, translating a list of strings.

Not a good fit

Tasks where output from task A is required as input to task B. Use SequentialWorkflow for those instead.

Building a parallel workflow

1

Import the agent and workflow

from agents import ExecutionAgent
from workflows import ParallelWorkflow
2

Initialize an LLM and create the ExecutionAgent

import os
from langchain_ollama import ChatOllama

llm      = ChatOllama(model=os.getenv("LOCAL_MODEL"))
executor = ExecutionAgent(llm=llm)
ParallelWorkflow only requires an "executor" agent. Unlike SequentialWorkflow, there is no planning or monitoring phase — each task is executed directly.
3

Build the agent dictionary

The dictionary must contain the key "executor". The constructor asserts this at init time:
# From parallel_workflow.py:
# assert "executor" in self.agents, "ParallelWorkflow requires an 'executor' agent."

parallel_workflow = ParallelWorkflow(agents={"executor": executor})
4

Define your list of tasks

Each element in the list is an independent task string that the executor will receive as its step_description:
tasks = [
    "What is the capital of France?",
    "What is the capital of Germany?",
    "What is the capital of Japan?",
    "What is the capital of Brazil?",
    "What is the capital of Australia?",
]
5

Run the workflow

Pass the task list and the maximum number of concurrent threads:
result = parallel_workflow.run(tasks=tasks, max_workers=5)
max_workers maps directly to ThreadPoolExecutor(max_workers=...). The default is 5.
6

Interpret the result

The return value always has status: "completed" and a results dictionary mapping each original task string to its output:
{
    "status": "completed",
    "results": {
        "What is the capital of France?": "The capital of France is Paris.",
        "What is the capital of Germany?": "The capital of Germany is Berlin.",
        "What is the capital of Japan?": "The capital of Japan is Tokyo.",
        "What is the capital of Brazil?": "The capital of Brazil is Brasília.",
        "What is the capital of Australia?": "The capital of Australia is Canberra.",
    }
}
If a task raises an exception, its entry is set to "ERROR: <message>" and the rest of the tasks continue unaffected.
print(f"Status: {result['status']}")
for task, output in result['results'].items():
    print(f"\nTask : {task}\nResult: {output}")

Thread safety

All concurrent calls go through ExecutionAgent.execute_step, which is a stateless method — it does not write to any shared mutable state on the agent instance. The ThreadPoolExecutor therefore runs it safely across threads.
# From parallel_workflow.py — how tasks are dispatched
def execute_wrapper(task: str) -> str:
    return executor_agent.execute_step(task)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as thread_executor:
    future_to_task = {
        thread_executor.submit(execute_wrapper, task): task
        for task in tasks
    }
    for future in concurrent.futures.as_completed(future_to_task):
        task   = future_to_task[future]
        result = future.result()
        results[task] = result
If you attach tools to ExecutionAgent that hold shared mutable state (e.g., a counter, a file handle), make sure those tools are thread-safe before using them in ParallelWorkflow.

Full example

import os
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from agents import ExecutionAgent
from workflows import ParallelWorkflow

load_dotenv()

llm      = ChatOllama(model=os.getenv("LOCAL_MODEL"))
executor = ExecutionAgent(llm=llm)

parallel_workflow = ParallelWorkflow(agents={"executor": executor})

tasks = [
    "Summarize the benefits of exercise in 2 sentences.",
    "Summarize the benefits of sleep in 2 sentences.",
    "Summarize the benefits of hydration in 2 sentences.",
]

result = parallel_workflow.run(tasks=tasks, max_workers=3)

print(f"Status: {result['status']}")
for task, output in result['results'].items():
    print(f"\nTask : {task}\nResult: {output}")

Build docs developers (and LLMs) love