Skip to main content

Overview

Multi-agent workflows enable complex interactions where multiple agents collaborate, compete, or verify each other’s work. This guide covers implementing multi-agent patterns using rLLM’s workflow system.

Why Multi-Agent?

Use cases:
  • Solver-Judge: One agent solves, another verifies
  • Debate: Multiple agents argue different positions
  • Collaborative: Agents work together on subtasks
  • Ensemble: Multiple solutions, select best one
Benefits:
  • Improved accuracy through verification
  • More diverse solutions
  • Natural curriculum learning
  • Better credit assignment

Workflow Architecture

Multi-agent systems use Workflow instead of Agent + Environment:
from rllm.workflows.workflow import Workflow
from rllm.agents.agent import Episode, Trajectory

class MultiAgentWorkflow(Workflow):
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        # Implement custom multi-agent logic
        # Return Episode with multiple trajectories
        return Episode(
            id=uid,
            task=task,
            trajectories=[solver_traj, judge_traj],  # Multiple trajectories!
            is_correct=final_result,
            metrics={"solver_acc": ..., "judge_acc": ...}
        )
See rllm/workflows/workflow.py:32.

Solver-Judge Pattern

The most common multi-agent pattern: one agent generates solutions, another verifies them.

Implementation

1

Define Solver agent

Creates candidate solutions:
from rllm.engine import RolloutEngine, ModelOutput
from rllm.agents.agent import Trajectory, Step
import asyncio

class Solver:
    def __init__(self, rollout_engine: RolloutEngine, **kwargs):
        self.rollout_engine = rollout_engine
    
    async def generate_solution(self, problem: str) -> Trajectory:
        messages = [
            {
                "role": "user",
                "content": f"{problem}. Output the final answer within <answer>...</answer>"
            }
        ]
        
        output: ModelOutput = await self.rollout_engine.get_model_response(messages)
        
        return Trajectory(
            name="solver",
            steps=[
                Step(
                    chat_completions=messages + [
                        {
                            "role": "assistant",
                            "content": output.content,
                            "reasoning": output.reasoning
                        }
                    ],
                    thought=output.reasoning,
                    action=self._parse_solver_response(output.content),
                    model_output=output,
                )
            ],
        )
From examples/solver_judge/solver_judge_flow.py:10.
2

Generate multiple solutions

Create diverse candidates:
async def generate_solutions(
    self,
    problem: str,
    n_solutions: int = 2
) -> list[Trajectory]:
    # Generate solutions in parallel
    tasks = [
        asyncio.create_task(self.generate_solution(problem))
        for _ in range(n_solutions)
    ]
    return await asyncio.gather(*tasks)
From examples/solver_judge/solver_judge_flow.py:29.
3

Define Judge agent

Evaluates and selects solutions:
class Judge:
    def __init__(self, rollout_engine: RolloutEngine, **kwargs):
        self.rollout_engine = rollout_engine
    
    async def judge_solutions(
        self,
        problem: str,
        solutions: list[str]
    ) -> Trajectory:
        messages = [
            {
                "role": "user",
                "content": self._create_judge_prompt(problem, solutions)
            }
        ]
        
        output: ModelOutput = await self.rollout_engine.get_model_response(messages)
        
        return Trajectory(
            name="judge",
            steps=[
                Step(
                    chat_completions=messages + [
                        {
                            "role": "assistant",
                            "content": output.content,
                            "reasoning": output.reasoning
                        }
                    ],
                    thought=output.reasoning,
                    action=self._parse_judge_response(output.content, solutions),
                    model_output=output,
                )
            ],
        )
From examples/solver_judge/solver_judge_flow.py:41.
4

Create judge prompt

Format solutions for evaluation:
def _create_judge_prompt(self, problem: str, solutions: list[str]) -> str:
    prompt = f"""You are an expert verifier. Given a problem and multiple solution attempts, select the correct solution.
    
Problem:
{problem}

Solutions to evaluate:
"""
    
    for i, solution in enumerate(solutions, 1):
        prompt += f"\nSolution {i}:\n{solution}\n"
    
    prompt += """
Output the index of your selected solution within <answer>...</answer> tags, e.g., <answer>1</answer> for the first solution.
If multiple solutions are correct, output the index of the first correct solution."""
    
    return prompt
From examples/solver_judge/solver_judge_flow.py:71.
5

Combine in workflow

Orchestrate solver and judge:
from rllm.workflows.workflow import Workflow
from rllm.rewards.reward_fn import RewardFunction

class SolverJudgeWorkflow(Workflow):
    def __init__(
        self,
        rollout_engine: RolloutEngine,
        n_solutions: int = 2,
        reward_function: RewardFunction = None,
        **kwargs
    ):
        super().__init__(rollout_engine, **kwargs)
        self.n_solutions = n_solutions
        self.reward_function = reward_function
        self.solver = Solver(rollout_engine)
        self.judge = Judge(rollout_engine)
    
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        self.reset(task, uid)
        problem = task["question"]
        
        # Step 1: Solver generates multiple solutions in parallel
        solver_trajectories = await self.solver.generate_solutions(
            problem,
            self.n_solutions
        )
        
        # Assign rewards to solver trajectories
        solutions = []
        for traj in solver_trajectories:
            solution = traj.steps[0].action
            solutions.append(solution)
            reward = self.reward_function(task, solution).reward
            traj.steps[0].reward = reward
        
        # Step 2: Judge selects the best solution
        judge_trajectory = await self.judge.judge_solutions(
            problem,
            solutions
        )
        selected_solution = judge_trajectory.steps[0].action
        
        # Evaluate the selected solution
        reward_result = self.reward_function(task, selected_solution)
        judge_trajectory.steps[0].reward = reward_result.reward
        is_correct = reward_result.is_correct
        
        # Compute metrics
        solver_acc = sum(traj.steps[0].reward for traj in solver_trajectories) / len(solver_trajectories)
        judge_acc = int(is_correct)
        
        # Step 3: Return episode with multiple trajectories
        return Episode(
            id=uid,
            task=task,
            trajectories=[*solver_trajectories, judge_trajectory],
            is_correct=is_correct,
            metrics={"solver_acc": solver_acc, "judge_acc": judge_acc},
        )
From examples/solver_judge/solver_judge_flow.py:92.

Complete Solver-Judge Example

import asyncio
import re
from rllm.agents.agent import Episode, Step, Trajectory
from rllm.engine import ModelOutput, RolloutEngine
from rllm.rewards.reward_fn import RewardFunction
from rllm.workflows.workflow import Workflow

class Solver:
    def __init__(self, rollout_engine: RolloutEngine, **kwargs):
        self.rollout_engine = rollout_engine
    
    async def generate_solution(self, problem: str) -> Trajectory:
        messages = [
            {
                "role": "user",
                "content": f"{problem}. Output the final answer within <answer>...</answer>"
            }
        ]
        output: ModelOutput = await self.rollout_engine.get_model_response(messages)
        return Trajectory(
            name="solver",
            steps=[
                Step(
                    chat_completions=messages + [
                        {
                            "role": "assistant",
                            "content": output.content,
                            "reasoning": output.reasoning
                        }
                    ],
                    thought=output.reasoning,
                    action=self._parse_solver_response(output.content),
                    model_output=output,
                )
            ],
        )
    
    async def generate_solutions(self, problem: str, n_solutions: int = 2) -> list[Trajectory]:
        tasks = [
            asyncio.create_task(self.generate_solution(problem))
            for _ in range(n_solutions)
        ]
        return await asyncio.gather(*tasks)
    
    def _parse_solver_response(self, response: str) -> str:
        answer_match = re.search(r"<answer>(.*?)</answer>", response, re.IGNORECASE | re.DOTALL)
        if answer_match:
            return f"<answer>{answer_match.group(1).strip()}</answer>"
        else:
            return "No solution found"

class Judge:
    def __init__(self, rollout_engine: RolloutEngine, **kwargs):
        self.rollout_engine = rollout_engine
    
    async def judge_solutions(self, problem: str, solutions: list[str]) -> Trajectory:
        messages = [{"role": "user", "content": self._create_judge_prompt(problem, solutions)}]
        output: ModelOutput = await self.rollout_engine.get_model_response(messages)
        return Trajectory(
            name="judge",
            steps=[
                Step(
                    chat_completions=messages + [
                        {
                            "role": "assistant",
                            "content": output.content,
                            "reasoning": output.reasoning
                        }
                    ],
                    thought=output.reasoning,
                    action=self._parse_judge_response(output.content, solutions),
                    model_output=output,
                )
            ],
        )
    
    def _parse_judge_response(self, response: str, solutions: list[str]) -> str:
        answer_match = re.search(r"<answer>(.*?)</answer>", response, re.IGNORECASE | re.DOTALL)
        if answer_match:
            answer_text = answer_match.group(1).strip()
            try:
                solution_index = int(answer_text)
                return solutions[solution_index - 1]
            except (ValueError, IndexError):
                return ""
        return ""
    
    def _create_judge_prompt(self, problem: str, solutions: list[str]) -> str:
        prompt = f"""You are an expert verifier. Given a problem and multiple solution attempts, select a correct solution.

Problem:
{problem}

Solutions to evaluate:
"""
        for i, solution in enumerate(solutions, 1):
            prompt += f"\nSolution {i}:\n{solution}\n"
        
        prompt += """
Output the index of your selected solution within <answer>...</answer> tags, e.g., <answer>1</answer> for the first solution.
If multiple solutions are correct, output the index of the first correct solution."""
        return prompt

class SolverJudgeWorkflow(Workflow):
    def __init__(
        self,
        rollout_engine: RolloutEngine,
        n_solutions: int = 2,
        reward_function: RewardFunction = None,
        **kwargs
    ):
        super().__init__(rollout_engine, **kwargs)
        self.n_solutions = n_solutions
        self.reward_function = reward_function
        self.solver = Solver(rollout_engine)
        self.judge = Judge(rollout_engine)
    
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        self.reset(task, uid)
        problem = task["question"]
        
        # Step 1: Solver generates multiple solutions in parallel
        solver_trajectories = await self.solver.generate_solutions(problem, self.n_solutions)
        
        # Assign rewards to solver trajectories
        solutions = []
        for traj in solver_trajectories:
            solution = traj.steps[0].action
            solutions.append(solution)
            reward = self.reward_function(task, solution).reward
            traj.steps[0].reward = reward
        
        # Step 2: Judge selects the best solution
        judge_trajectory = await self.judge.judge_solutions(problem, solutions)
        selected_solution = judge_trajectory.steps[0].action
        
        # Evaluate the selected solution
        reward_result = self.reward_function(task, selected_solution)
        judge_trajectory.steps[0].reward = reward_result.reward
        is_correct = reward_result.is_correct
        
        # Compute metrics
        solver_acc = sum(traj.steps[0].reward for traj in solver_trajectories) / len(solver_trajectories)
        judge_acc = int(is_correct)
        
        # Step 3: Return episode with multiple trajectories
        return Episode(
            id=uid,
            task=task,
            trajectories=[*solver_trajectories, judge_trajectory],
            is_correct=is_correct,
            metrics={"solver_acc": solver_acc, "judge_acc": judge_acc},
        )
From examples/solver_judge/solver_judge_flow.py:1.

Training Multi-Agent Workflows

import hydra
from rllm.trainer.agent_trainer import AgentTrainer
from rllm.data.dataset import DatasetRegistry
from rllm.rewards.reward_fn import math_reward_fn

@hydra.main(
    config_path="pkg://rllm.trainer.config",
    config_name="agent_ppo_trainer"
)
def main(config):
    train_dataset = DatasetRegistry.load_dataset("countdown", "train")
    test_dataset = DatasetRegistry.load_dataset("countdown", "test")
    
    # Configure workflow
    config.rllm.workflow.use_workflow = True
    config.rllm.workflow.n_parallel_tasks = 256
    
    trainer = AgentTrainer(
        workflow_class=SolverJudgeWorkflow,
        workflow_args={
            "n_solutions": 2,
            "reward_function": math_reward_fn,
        },
        config=config,
        train_dataset=train_dataset,
        val_dataset=test_dataset,
    )
    
    trainer.train()

if __name__ == "__main__":
    main()

Advanced Patterns

Different Models for Different Roles

class SolverJudgeWorkflow(Workflow):
    def __init__(self, rollout_engine, judge_engine, **kwargs):
        super().__init__(rollout_engine, **kwargs)
        self.solver = Solver(rollout_engine)  # Fast model
        self.judge = Judge(judge_engine)      # Stronger model

Multi-Turn Collaboration

class CollaborativeWorkflow(Workflow):
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        trajectories = []
        
        # Agent 1: Initial solution
        traj1 = await self.agent1.generate(task)
        trajectories.append(traj1)
        
        # Agent 2: Refine solution
        traj2 = await self.agent2.refine(task, traj1.steps[0].action)
        trajectories.append(traj2)
        
        # Agent 3: Final verification
        traj3 = await self.agent3.verify(task, traj2.steps[0].action)
        trajectories.append(traj3)
        
        return Episode(
            id=uid,
            task=task,
            trajectories=trajectories,
            is_correct=traj3.steps[0].reward > 0.5,
        )

Ensemble Voting

class EnsembleWorkflow(Workflow):
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        # Generate multiple solutions
        trajectories = await self.generate_solutions(task, n=5)
        
        # Extract answers
        answers = [traj.steps[0].action for traj in trajectories]
        
        # Vote for most common answer
        from collections import Counter
        vote_counts = Counter(answers)
        final_answer = vote_counts.most_common(1)[0][0]
        
        # Evaluate final answer
        reward_result = self.reward_fn(task, final_answer)
        
        return Episode(
            id=uid,
            task=task,
            trajectories=trajectories,
            is_correct=reward_result.is_correct,
            metrics={"vote_counts": dict(vote_counts)},
        )

Trajectory Grouping

For advanced advantage computation, group trajectories by role:
from rllm.agents.agent import TrajectoryGroup

class SolverJudgeWorkflow(Workflow):
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        # ... generate trajectories ...
        
        # Group trajectories for advantage computation
        episode = Episode(
            id=uid,
            task=task,
            trajectories=[*solver_trajectories, judge_trajectory],
            is_correct=is_correct,
        )
        
        # Custom grouping logic can be added here
        # See rllm/trainer/distill/advantage.py for examples
        
        return episode
The framework automatically groups trajectories by name for advantage computation.

Configuration

Enable Workflow Mode

config.rllm.workflow.use_workflow = True
config.rllm.workflow.n_parallel_tasks = 256

Set Number of Solutions

workflow_args = {
    "n_solutions": 4,  # More solutions = better coverage
    "reward_function": math_reward_fn,
}

Adjust Timeouts

config.rllm.workflow.timeout = 300  # Seconds per episode

Best Practices

  1. Start with 2 solutions: Balance diversity and compute cost
  2. Use async for parallelism: Generate solutions concurrently
  3. Assign rewards to all trajectories: Even incorrect ones for learning
  4. Track per-role metrics: Monitor solver and judge performance separately
  5. Use different prompts: Solver should explore, judge should verify
  6. Handle parsing errors: Return empty string rather than crashing
  7. Test components separately: Debug solver and judge independently
Multi-agent workflows require more compute than single-agent setups. Start with n_solutions=2 and increase only if needed.

Common Issues

Judge Always Selects First Solution

  1. Improve judge prompt with clearer criteria
  2. Add few-shot examples to judge prompt
  3. Increase judge model size (use stronger model)
  4. Randomize solution order in prompt

Solver Solutions Too Similar

  1. Increase temperature in generation config
  2. Use top_p sampling instead of greedy
  3. Add diverse few-shot examples
  4. Modify prompt to encourage different approaches

Training Not Converging

  1. Check that rewards are being assigned to all trajectories
  2. Verify metrics show meaningful differences
  3. Reduce n_solutions if variance is too high
  4. Ensure judge is learning from solver improvements
The solver-judge pattern creates a natural curriculum: as solvers improve, the judge learns to distinguish increasingly subtle differences.

Next Steps

Build docs developers (and LLMs) love