Overview
A rollout is a complete interaction sequence from initial prompt to final completion, including all model responses, environment feedback, tool executions, and computed rewards. Each rollout produces a State object that tracks the full interaction history.
import verifiers as vf
# Single rollout
state = await env.rollout(
input=dataset[0],
client=client,
model="gpt-4",
sampling_args={"temperature": 0.7}
)
# state["completion"] - final response
# state["reward"] - computed reward
# state["trajectory"] - turn-by-turn history
Rollout Lifecycle
Every rollout follows this sequence:
1. State Initialization
state = await env.init_state(
input=RolloutInput({
"prompt": [{"role": "user", "content": "What is 2+2?"}],
"answer": "4",
"example_id": 0,
"task": "math-qa"
}),
client=client,
model="gpt-4",
sampling_args={"temperature": 0.7}
)
Initial state fields:
State({
# Input data (forwarded from input)
"prompt": [...],
"answer": "4",
"example_id": 0,
"task": "math-qa",
"info": {},
# Configuration
"client": client,
"model": "gpt-4",
"sampling_args": {"temperature": 0.7},
"tool_defs": [],
# Status
"is_completed": False,
"is_truncated": False,
# Outputs (populated during rollout)
"trajectory": [],
"completion": None,
"trajectory_id": "a3f9c2e1...", # unique rollout ID
# Scoring (populated after rollout)
"reward": None,
"advantage": None,
"metrics": None,
"error": None,
"final_env_response": None,
# Timing
"timing": {
"start_time": 1234567890.123,
"generation_ms": 0.0,
"scoring_ms": 0.0,
"total_ms": 0.0
},
# Usage tracking
"usage": {"input_tokens": 0.0, "output_tokens": 0.0}
})
2. Setup State (Per-Rollout Initialization)
async def setup_state(self, state: State) -> State:
"""Override to initialize per-rollout resources."""
state["game_board"] = initialize_board()
state["score"] = 0
state["session_id"] = await create_session()
return await super().setup_state(state)
Called before the rollout loop begins. Used for:
- Creating per-rollout resources (sandbox sessions, game state)
- Initializing environment-specific state fields
- Queueing asynchronous setup (e.g., sandbox provisioning)
3. Rollout Loop
For MultiTurnEnv and subclasses:
while not await env.is_completed(state):
# Get prompt messages
prompt_messages = await env.get_prompt_messages(state)
# Check for early termination signal
if state.get("final_env_response") is not None:
continue
# Get model response
response = await env.get_model_response(state, prompt_messages)
# Add to trajectory
await env.add_model_response(state, prompt_messages, response)
Turn sequence:
get_prompt_messages() - Build messages for model
get_model_response() - Call model API
add_model_response() - Record in trajectory
is_completed() - Check all stop conditions
4. Completion Rendering
await env.render_completion(state)
# Sets state["completion"] from trajectory
Extracts final conversation from trajectory. Default behavior:
state["completion"] = full_conversation[len(prompt_messages):]
5. Cleanup
@vf.cleanup
async def cleanup_sandbox(self, state: State):
await destroy_sandbox(state["session_id"])
@vf.cleanup
async def save_game_log(self, state: State):
await save_log(state["game_id"], state["trajectory"])
All @vf.cleanup handlers are called after is_completed() returns True.
6. Scoring
# Individual scoring
await env.rubric.score_rollout(state)
# Group scoring (default for evaluation)
await env.rubric.score_group([state1, state2, state3, state4])
Sets state["reward"], state["advantage"], and state["metrics"].
State Object
Type Definition
from verifiers.types import State
class State(dict):
"""Dict subclass with input field forwarding."""
INPUT_FIELDS = ["prompt", "answer", "task", "info", "example_id"]
Field Forwarding
Accessing INPUT_FIELDS automatically forwards to nested input object:
state = State(input={
"prompt": [...],
"answer": "4",
"example_id": 0
})
# These are equivalent:
state["prompt"] # forwards to state["input"]["prompt"]
state["answer"] # forwards to state["input"]["answer"]
Usage Tracking
# Automatically tracked during model calls
state["usage"] = {
"input_tokens": 150.0,
"output_tokens": 80.0
}
# Access in reward functions
async def token_efficiency(state) -> float:
total = state["usage"]["input_tokens"] + state["usage"]["output_tokens"]
return 1.0 if total < 500 else 0.5
Custom State Fields
Environments can add arbitrary fields:
async def setup_state(self, state: State) -> State:
state["game_board"] = [[0] * 8 for _ in range(8)]
state["player_position"] = (0, 0)
state["inventory"] = []
return await super().setup_state(state)
async def env_response(self, messages, state) -> vf.Messages:
# Use custom state fields
board = state["game_board"]
position = state["player_position"]
# ...
Trajectory
The trajectory field records every turn in multi-turn rollouts:
TrajectoryStep Type
from verifiers.types import TrajectoryStep
class TrajectoryStep(TypedDict):
prompt: Messages # Input messages for this turn
completion: Messages # Model output for this turn
response: Response # Raw API response
tokens: TrajectoryStepTokens | None # Token-level data for training
reward: float | None # Per-turn reward (usually None, set in add_trajectory_step)
advantage: float | None # Per-turn advantage (usually None)
is_truncated: bool # Whether generation was truncated
trajectory_id: str # Rollout UUID
extras: dict[str, Any] # Custom per-turn metadata
Accessing Trajectory
state["trajectory"] # list[TrajectoryStep]
# Get last turn
last_turn = state["trajectory"][-1]
last_prompt = last_turn["prompt"]
last_completion = last_turn["completion"]
# Count turns
num_turns = len(state["trajectory"])
# Iterate all turns
for turn in state["trajectory"]:
print(turn["prompt"], turn["completion"])
Custom Trajectory Handling
Override add_trajectory_step() to customize metadata:
async def add_trajectory_step(self, state: State, trajectory_step: TrajectoryStep):
# Add custom metadata
trajectory_step["extras"]["board_state"] = state["game_board"].copy()
trajectory_step["extras"]["valid_moves"] = get_valid_moves(state)
# Set intermediate rewards
if self.check_intermediate_goal(state):
trajectory_step["reward"] = 0.5
await super().add_trajectory_step(state, trajectory_step)
Token-Level Data
For RL training, trajectories include token IDs and logprobs:
class TrajectoryStepTokens(TypedDict):
prompt_ids: list[int] # Input token IDs
prompt_mask: list[int] # Attention mask
completion_ids: list[int] # Output token IDs
completion_mask: list[int] # Attention mask
completion_logprobs: list[float] # Log probabilities
overlong_prompt: bool # Prompt exceeded max_seq_len
is_truncated: bool # Output was truncated
routed_experts: list[list[list[int]]] | None # MoE routing (for MoE models)
Stop Conditions
Stop conditions are checked after every model response via is_completed():
Built-in Stop Conditions
MultiTurnEnv includes:
@vf.stop(priority=100) # checked first
async def has_error(self, state: State) -> bool:
return state.get("error") is not None
@vf.stop
async def prompt_too_long(self, state: State) -> bool:
return state.get("prompt_too_long", False)
@vf.stop
async def max_turns_reached(self, state: State) -> bool:
return len(state["trajectory"]) >= self.max_turns and self.max_turns > 0
@vf.stop
async def has_final_env_response(self, state: State) -> bool:
return state.get("final_env_response") is not None
ToolEnv adds:
@vf.stop
async def no_tools_called(self, state: State) -> bool:
"""Stop when model responds without tool calls."""
if not state["trajectory"]:
return False
last_completion = state["trajectory"][-1]["completion"]
return not any(msg.get("tool_calls") for msg in last_completion)
Custom Stop Conditions
class MyGameEnv(vf.MultiTurnEnv):
@vf.stop(priority=10) # check before expensive conditions
async def game_won(self, state: State) -> bool:
return state.get("won", False)
@vf.stop
async def game_lost(self, state: State) -> bool:
return state.get("lives", 1) <= 0
@vf.stop(priority=-10) # check last (expensive)
async def timeout_reached(self, state: State) -> bool:
elapsed = time.time() - state["timing"]["start_time"]
return elapsed > 300.0 # 5 minutes
Stop Condition Behavior
When any stop condition returns True:
state["is_completed"] = True
state["is_truncated"] set if any trajectory step was truncated
state["stop_condition"] set to condition name
- Timing fields updated
- All
@vf.cleanup handlers called
- Loop exits
Error Handling
Error Hierarchy
vf.Error (base)
├── vf.ModelError
│ └── vf.EmptyModelResponseError
├── vf.OverlongPromptError
├── vf.ToolError
│ ├── vf.ToolParseError
│ └── vf.ToolCallError
└── vf.InfraError
├── vf.SandboxError
└── vf.TunnelError
Automatic Error Catching
All vf.Error exceptions raised during rollout are caught and stored:
try:
response = await self.get_model_response(state, prompt_messages)
except vf.Error as e:
state["error"] = e
# Triggers has_error stop condition on next is_completed() check
env = vf.ToolEnv(
tools=[my_tool],
stop_errors=[vf.ToolParseError], # stop on parse errors
# Other tool errors returned as tool messages for model recovery
)
Error Access
if state.get("error"):
print(f"Rollout failed: {state['error']}")
print(f"Stop condition: {state['stop_condition']}") # "has_error"
Timing
Rollout timing is tracked automatically:
state["timing"] = {
"start_time": 1234567890.123, # Unix timestamp
"generation_ms": 2500.0, # Model generation time
"scoring_ms": 150.0, # Rubric scoring time
"total_ms": 2650.0 # Total rollout time
}
Timing flow:
start_time set in init_state()
generation_ms computed when stop condition triggers
scoring_ms tracked by rubric
total_ms = generation_ms + scoring_ms
Input format for rollouts:
from verifiers.types import RolloutInput
class RolloutInput(TypedDict):
prompt: Messages # Required: input messages
example_id: int # Required: for grouping
task: str # Required: task identifier
answer: str # Optional: ground truth
info: Info # Optional: metadata dict
RolloutOutput
Serialized output from rollouts:
from verifiers.types import RolloutOutput
class RolloutOutput(dict):
# Always present
example_id: int
task: str
prompt: Messages | None
completion: Messages | None
reward: float
timing: RolloutTiming
is_completed: bool
is_truncated: bool
metrics: dict[str, float]
# Optional
answer: str
info: Info
error: str | None
stop_condition: str | None
trajectory: list[TrajectoryStep]
tool_defs: list[Tool] | None
Conversion:
from verifiers.utils.save_utils import state_to_output
output = state_to_output(
state,
state_columns=["custom_field"] # include custom state fields
)
Generation and Evaluation
Generate Method
results = await env.generate(
inputs=dataset,
client=client,
model="gpt-4",
sampling_args={"temperature": 0.7},
max_concurrent=10,
save_results=True,
results_path=Path("./results"),
state_columns=["game_board", "score"], # save custom fields
independent_scoring=False, # use group scoring (default)
)
Returns:
class GenerateOutputs(TypedDict):
outputs: list[RolloutOutput]
metadata: GenerateMetadata
Evaluate Method
results = await env.evaluate(
client=client,
model="gpt-4",
num_examples=100,
rollouts_per_example=4, # 400 total rollouts
max_concurrent=10,
save_results=True
)
print(f"Average reward: {results['metadata']['avg_reward']}")
print(f"Pass@1: {results['metadata']['pass_at_k']['1']}")
print(f"Pass@4: {results['metadata']['pass_at_k']['4']}")
Progress Callbacks
Monitor generation progress:
async def on_progress(all_outputs, new_outputs, metadata):
print(f"Completed {len(all_outputs)} rollouts")
print(f"Current avg reward: {metadata['avg_reward']}")
results = await env.evaluate(
client=client,
model="gpt-4",
on_progress=on_progress # called after each group
)
Callback signature:
ProgressCallback = Callable[
[
list[RolloutOutput], # all_outputs
list[RolloutOutput], # new_outputs (just completed)
GenerateMetadata, # metadata (updated)
],
None
]
Result Persistence
Saving Results
results = await env.evaluate(
client=client,
model="gpt-4",
save_results=True,
results_path=Path("./results/experiment_1")
)
# Creates:
# ./results/experiment_1/
# ├── outputs.jsonl # RolloutOutput per line
# └── metadata.json # GenerateMetadata
Resuming Evaluations
# First run (partial completion)
results = await env.evaluate(
client=client,
model="gpt-4",
num_examples=100,
save_results=True,
results_path=Path("./results/run_1")
)
# Completes 60 rollouts, then crashes
# Resume from checkpoint
results = await env.evaluate(
client=client,
model="gpt-4",
num_examples=100,
save_results=True,
results_path=Path("./results/run_1") # same path
)
# Completes remaining 40 rollouts
Pushing to HuggingFace Hub
results = await env.evaluate(
client=client,
model="gpt-4",
save_results=True,
push_to_hf_hub=True,
hf_hub_dataset_name="username/experiment-results"
)
When resuming evaluations, the framework validates that environment configuration (env_id, model, num_examples, rollouts_per_example) matches the saved metadata to prevent accidental mixing of results.
Advanced Rollout Customization
Custom Prompt Messages
Override get_prompt_messages() for non-linear conversations:
async def get_prompt_messages(self, state: State) -> Messages:
"""Custom message construction."""
if state.get("special_mode"):
# Use alternative prompt format
return build_special_prompt(state)
return await super().get_prompt_messages(state)
Custom Completion Rendering
Override render_completion() for custom output extraction:
async def render_completion(self, state: State):
"""Extract only final answer, not full conversation."""
if state["trajectory"]:
last_turn = state["trajectory"][-1]
parsed = self.parser.parse(last_turn["completion"][-1]["content"])
state["completion"] = [{"role": "assistant", "content": parsed.answer}]
else:
state["completion"] = []
Set per-turn rewards via add_trajectory_step():
async def add_trajectory_step(self, state: State, trajectory_step: TrajectoryStep):
# Reward intermediate progress
if self.check_intermediate_goal(state):
trajectory_step["reward"] = 0.3
elif self.check_mistake(state):
trajectory_step["reward"] = -0.5
await super().add_trajectory_step(state, trajectory_step)
Intermediate rewards are advanced features primarily used for RL training. For most evaluation use cases, final rewards computed by the rubric are sufficient.