Skip to main content

Overview

A rollout is a complete interaction sequence from initial prompt to final completion, including all model responses, environment feedback, tool executions, and computed rewards. Each rollout produces a State object that tracks the full interaction history.
import verifiers as vf

# Single rollout
state = await env.rollout(
    input=dataset[0],
    client=client,
    model="gpt-4",
    sampling_args={"temperature": 0.7}
)

# state["completion"] - final response
# state["reward"] - computed reward
# state["trajectory"] - turn-by-turn history

Rollout Lifecycle

Every rollout follows this sequence:

1. State Initialization

state = await env.init_state(
    input=RolloutInput({
        "prompt": [{"role": "user", "content": "What is 2+2?"}],
        "answer": "4",
        "example_id": 0,
        "task": "math-qa"
    }),
    client=client,
    model="gpt-4",
    sampling_args={"temperature": 0.7}
)
Initial state fields:
State({
    # Input data (forwarded from input)
    "prompt": [...],
    "answer": "4",
    "example_id": 0,
    "task": "math-qa",
    "info": {},
    
    # Configuration
    "client": client,
    "model": "gpt-4",
    "sampling_args": {"temperature": 0.7},
    "tool_defs": [],
    
    # Status
    "is_completed": False,
    "is_truncated": False,
    
    # Outputs (populated during rollout)
    "trajectory": [],
    "completion": None,
    "trajectory_id": "a3f9c2e1...",  # unique rollout ID
    
    # Scoring (populated after rollout)
    "reward": None,
    "advantage": None,
    "metrics": None,
    "error": None,
    "final_env_response": None,
    
    # Timing
    "timing": {
        "start_time": 1234567890.123,
        "generation_ms": 0.0,
        "scoring_ms": 0.0,
        "total_ms": 0.0
    },
    
    # Usage tracking
    "usage": {"input_tokens": 0.0, "output_tokens": 0.0}
})

2. Setup State (Per-Rollout Initialization)

async def setup_state(self, state: State) -> State:
    """Override to initialize per-rollout resources."""
    state["game_board"] = initialize_board()
    state["score"] = 0
    state["session_id"] = await create_session()
    return await super().setup_state(state)
Called before the rollout loop begins. Used for:
  • Creating per-rollout resources (sandbox sessions, game state)
  • Initializing environment-specific state fields
  • Queueing asynchronous setup (e.g., sandbox provisioning)

3. Rollout Loop

For MultiTurnEnv and subclasses:
while not await env.is_completed(state):
    # Get prompt messages
    prompt_messages = await env.get_prompt_messages(state)
    
    # Check for early termination signal
    if state.get("final_env_response") is not None:
        continue
    
    # Get model response
    response = await env.get_model_response(state, prompt_messages)
    
    # Add to trajectory
    await env.add_model_response(state, prompt_messages, response)
Turn sequence:
  1. get_prompt_messages() - Build messages for model
  2. get_model_response() - Call model API
  3. add_model_response() - Record in trajectory
  4. is_completed() - Check all stop conditions

4. Completion Rendering

await env.render_completion(state)
# Sets state["completion"] from trajectory
Extracts final conversation from trajectory. Default behavior:
state["completion"] = full_conversation[len(prompt_messages):]

5. Cleanup

@vf.cleanup
async def cleanup_sandbox(self, state: State):
    await destroy_sandbox(state["session_id"])

@vf.cleanup
async def save_game_log(self, state: State):
    await save_log(state["game_id"], state["trajectory"])
All @vf.cleanup handlers are called after is_completed() returns True.

6. Scoring

# Individual scoring
await env.rubric.score_rollout(state)

# Group scoring (default for evaluation)
await env.rubric.score_group([state1, state2, state3, state4])
Sets state["reward"], state["advantage"], and state["metrics"].

State Object

Type Definition

from verifiers.types import State

class State(dict):
    """Dict subclass with input field forwarding."""
    INPUT_FIELDS = ["prompt", "answer", "task", "info", "example_id"]

Field Forwarding

Accessing INPUT_FIELDS automatically forwards to nested input object:
state = State(input={
    "prompt": [...],
    "answer": "4",
    "example_id": 0
})

# These are equivalent:
state["prompt"]  # forwards to state["input"]["prompt"]
state["answer"]  # forwards to state["input"]["answer"]

Usage Tracking

# Automatically tracked during model calls
state["usage"] = {
    "input_tokens": 150.0,
    "output_tokens": 80.0
}

# Access in reward functions
async def token_efficiency(state) -> float:
    total = state["usage"]["input_tokens"] + state["usage"]["output_tokens"]
    return 1.0 if total < 500 else 0.5

Custom State Fields

Environments can add arbitrary fields:
async def setup_state(self, state: State) -> State:
    state["game_board"] = [[0] * 8 for _ in range(8)]
    state["player_position"] = (0, 0)
    state["inventory"] = []
    return await super().setup_state(state)

async def env_response(self, messages, state) -> vf.Messages:
    # Use custom state fields
    board = state["game_board"]
    position = state["player_position"]
    # ...

Trajectory

The trajectory field records every turn in multi-turn rollouts:

TrajectoryStep Type

from verifiers.types import TrajectoryStep

class TrajectoryStep(TypedDict):
    prompt: Messages              # Input messages for this turn
    completion: Messages          # Model output for this turn
    response: Response            # Raw API response
    tokens: TrajectoryStepTokens | None  # Token-level data for training
    reward: float | None          # Per-turn reward (usually None, set in add_trajectory_step)
    advantage: float | None       # Per-turn advantage (usually None)
    is_truncated: bool            # Whether generation was truncated
    trajectory_id: str            # Rollout UUID
    extras: dict[str, Any]        # Custom per-turn metadata

Accessing Trajectory

state["trajectory"]  # list[TrajectoryStep]

# Get last turn
last_turn = state["trajectory"][-1]
last_prompt = last_turn["prompt"]
last_completion = last_turn["completion"]

# Count turns
num_turns = len(state["trajectory"])

# Iterate all turns
for turn in state["trajectory"]:
    print(turn["prompt"], turn["completion"])

Custom Trajectory Handling

Override add_trajectory_step() to customize metadata:
async def add_trajectory_step(self, state: State, trajectory_step: TrajectoryStep):
    # Add custom metadata
    trajectory_step["extras"]["board_state"] = state["game_board"].copy()
    trajectory_step["extras"]["valid_moves"] = get_valid_moves(state)
    
    # Set intermediate rewards
    if self.check_intermediate_goal(state):
        trajectory_step["reward"] = 0.5
    
    await super().add_trajectory_step(state, trajectory_step)

Token-Level Data

For RL training, trajectories include token IDs and logprobs:
class TrajectoryStepTokens(TypedDict):
    prompt_ids: list[int]              # Input token IDs
    prompt_mask: list[int]             # Attention mask
    completion_ids: list[int]          # Output token IDs
    completion_mask: list[int]         # Attention mask
    completion_logprobs: list[float]   # Log probabilities
    overlong_prompt: bool              # Prompt exceeded max_seq_len
    is_truncated: bool                 # Output was truncated
    routed_experts: list[list[list[int]]] | None  # MoE routing (for MoE models)

Stop Conditions

Stop conditions are checked after every model response via is_completed():

Built-in Stop Conditions

MultiTurnEnv includes:
@vf.stop(priority=100)  # checked first
async def has_error(self, state: State) -> bool:
    return state.get("error") is not None

@vf.stop
async def prompt_too_long(self, state: State) -> bool:
    return state.get("prompt_too_long", False)

@vf.stop
async def max_turns_reached(self, state: State) -> bool:
    return len(state["trajectory"]) >= self.max_turns and self.max_turns > 0

@vf.stop
async def has_final_env_response(self, state: State) -> bool:
    return state.get("final_env_response") is not None
ToolEnv adds:
@vf.stop
async def no_tools_called(self, state: State) -> bool:
    """Stop when model responds without tool calls."""
    if not state["trajectory"]:
        return False
    last_completion = state["trajectory"][-1]["completion"]
    return not any(msg.get("tool_calls") for msg in last_completion)

Custom Stop Conditions

class MyGameEnv(vf.MultiTurnEnv):
    @vf.stop(priority=10)  # check before expensive conditions
    async def game_won(self, state: State) -> bool:
        return state.get("won", False)
    
    @vf.stop
    async def game_lost(self, state: State) -> bool:
        return state.get("lives", 1) <= 0
    
    @vf.stop(priority=-10)  # check last (expensive)
    async def timeout_reached(self, state: State) -> bool:
        elapsed = time.time() - state["timing"]["start_time"]
        return elapsed > 300.0  # 5 minutes

Stop Condition Behavior

When any stop condition returns True:
  1. state["is_completed"] = True
  2. state["is_truncated"] set if any trajectory step was truncated
  3. state["stop_condition"] set to condition name
  4. Timing fields updated
  5. All @vf.cleanup handlers called
  6. Loop exits

Error Handling

Error Hierarchy

vf.Error (base)
├── vf.ModelError
│   └── vf.EmptyModelResponseError
├── vf.OverlongPromptError
├── vf.ToolError
│   ├── vf.ToolParseError
│   └── vf.ToolCallError
└── vf.InfraError
    ├── vf.SandboxError
    └── vf.TunnelError

Automatic Error Catching

All vf.Error exceptions raised during rollout are caught and stored:
try:
    response = await self.get_model_response(state, prompt_messages)
except vf.Error as e:
    state["error"] = e
    # Triggers has_error stop condition on next is_completed() check

Configurable Tool Error Handling

env = vf.ToolEnv(
    tools=[my_tool],
    stop_errors=[vf.ToolParseError],  # stop on parse errors
    # Other tool errors returned as tool messages for model recovery
)

Error Access

if state.get("error"):
    print(f"Rollout failed: {state['error']}")
    print(f"Stop condition: {state['stop_condition']}")  # "has_error"

Timing

Rollout timing is tracked automatically:
state["timing"] = {
    "start_time": 1234567890.123,      # Unix timestamp
    "generation_ms": 2500.0,           # Model generation time
    "scoring_ms": 150.0,               # Rubric scoring time
    "total_ms": 2650.0                 # Total rollout time
}
Timing flow:
  1. start_time set in init_state()
  2. generation_ms computed when stop condition triggers
  3. scoring_ms tracked by rubric
  4. total_ms = generation_ms + scoring_ms

RolloutInput and RolloutOutput

RolloutInput

Input format for rollouts:
from verifiers.types import RolloutInput

class RolloutInput(TypedDict):
    prompt: Messages          # Required: input messages
    example_id: int           # Required: for grouping
    task: str                 # Required: task identifier
    answer: str               # Optional: ground truth
    info: Info                # Optional: metadata dict

RolloutOutput

Serialized output from rollouts:
from verifiers.types import RolloutOutput

class RolloutOutput(dict):
    # Always present
    example_id: int
    task: str
    prompt: Messages | None
    completion: Messages | None
    reward: float
    timing: RolloutTiming
    is_completed: bool
    is_truncated: bool
    metrics: dict[str, float]
    
    # Optional
    answer: str
    info: Info
    error: str | None
    stop_condition: str | None
    trajectory: list[TrajectoryStep]
    tool_defs: list[Tool] | None
Conversion:
from verifiers.utils.save_utils import state_to_output

output = state_to_output(
    state,
    state_columns=["custom_field"]  # include custom state fields
)

Generation and Evaluation

Generate Method

results = await env.generate(
    inputs=dataset,
    client=client,
    model="gpt-4",
    sampling_args={"temperature": 0.7},
    max_concurrent=10,
    save_results=True,
    results_path=Path("./results"),
    state_columns=["game_board", "score"],  # save custom fields
    independent_scoring=False,  # use group scoring (default)
)
Returns:
class GenerateOutputs(TypedDict):
    outputs: list[RolloutOutput]
    metadata: GenerateMetadata

Evaluate Method

results = await env.evaluate(
    client=client,
    model="gpt-4",
    num_examples=100,
    rollouts_per_example=4,  # 400 total rollouts
    max_concurrent=10,
    save_results=True
)

print(f"Average reward: {results['metadata']['avg_reward']}")
print(f"Pass@1: {results['metadata']['pass_at_k']['1']}")
print(f"Pass@4: {results['metadata']['pass_at_k']['4']}")

Progress Callbacks

Monitor generation progress:
async def on_progress(all_outputs, new_outputs, metadata):
    print(f"Completed {len(all_outputs)} rollouts")
    print(f"Current avg reward: {metadata['avg_reward']}")

results = await env.evaluate(
    client=client,
    model="gpt-4",
    on_progress=on_progress  # called after each group
)
Callback signature:
ProgressCallback = Callable[
    [
        list[RolloutOutput],      # all_outputs
        list[RolloutOutput],      # new_outputs (just completed)
        GenerateMetadata,         # metadata (updated)
    ],
    None
]

Result Persistence

Saving Results

results = await env.evaluate(
    client=client,
    model="gpt-4",
    save_results=True,
    results_path=Path("./results/experiment_1")
)

# Creates:
# ./results/experiment_1/
#   ├── outputs.jsonl        # RolloutOutput per line
#   └── metadata.json        # GenerateMetadata

Resuming Evaluations

# First run (partial completion)
results = await env.evaluate(
    client=client,
    model="gpt-4",
    num_examples=100,
    save_results=True,
    results_path=Path("./results/run_1")
)
# Completes 60 rollouts, then crashes

# Resume from checkpoint
results = await env.evaluate(
    client=client,
    model="gpt-4",
    num_examples=100,
    save_results=True,
    results_path=Path("./results/run_1")  # same path
)
# Completes remaining 40 rollouts

Pushing to HuggingFace Hub

results = await env.evaluate(
    client=client,
    model="gpt-4",
    save_results=True,
    push_to_hf_hub=True,
    hf_hub_dataset_name="username/experiment-results"
)
When resuming evaluations, the framework validates that environment configuration (env_id, model, num_examples, rollouts_per_example) matches the saved metadata to prevent accidental mixing of results.

Advanced Rollout Customization

Custom Prompt Messages

Override get_prompt_messages() for non-linear conversations:
async def get_prompt_messages(self, state: State) -> Messages:
    """Custom message construction."""
    if state.get("special_mode"):
        # Use alternative prompt format
        return build_special_prompt(state)
    return await super().get_prompt_messages(state)

Custom Completion Rendering

Override render_completion() for custom output extraction:
async def render_completion(self, state: State):
    """Extract only final answer, not full conversation."""
    if state["trajectory"]:
        last_turn = state["trajectory"][-1]
        parsed = self.parser.parse(last_turn["completion"][-1]["content"])
        state["completion"] = [{"role": "assistant", "content": parsed.answer}]
    else:
        state["completion"] = []

Intermediate Rewards

Set per-turn rewards via add_trajectory_step():
async def add_trajectory_step(self, state: State, trajectory_step: TrajectoryStep):
    # Reward intermediate progress
    if self.check_intermediate_goal(state):
        trajectory_step["reward"] = 0.3
    elif self.check_mistake(state):
        trajectory_step["reward"] = -0.5
    
    await super().add_trajectory_step(state, trajectory_step)
Intermediate rewards are advanced features primarily used for RL training. For most evaluation use cases, final rewards computed by the rubric are sufficient.

Build docs developers (and LLMs) love