Skip to main content

RolloutGatewayMixin

Opt-in mixin that replaces CliAgentEnv’s client-side interception with a server-side gateway path, allowing agents to communicate directly with prime-rl’s rollout gateway.
RolloutGatewayMixin is experimental and subject to breaking changes. The API may change in future releases.

Overview

When the gateway is active, agents talk directly to prime-rl’s rollout gateway through a Prime Tunnel. The environment only manages sandbox lifecycle and fetches the trajectory after completion. When inactive, it falls through to CliAgentEnv’s standard interception path. Key differences from standard CliAgentEnv:
  • Agent makes API calls directly to the gateway server (not intercepted by local proxy)
  • Environment registers/unregisters rollouts with the gateway
  • Trajectory is fetched from the gateway after agent completion
  • Requires prime-rl’s rollout gateway to be running

Method Resolution Order (MRO)

MyEnv → RolloutGatewayMixin → CliAgentEnv → SandboxMixin → MultiTurnEnv → Environment
The mixin should be placed before CliAgentEnv in the inheritance chain.

Usage

Basic Setup

import verifiers as vf
from verifiers.envs.experimental.rollout_gateway_mixin import RolloutGatewayMixin
from verifiers.envs.experimental.cli_agent_env import CliAgentEnv

class MyGatewayEnv(RolloutGatewayMixin, CliAgentEnv):
    def __init__(self, **kwargs):
        super().__init__(
            run_command="python /app/agent.py",
            docker_image="python:3.11",
            **kwargs
        )
        # Initialize gateway resources
        self.init_gateway(
            gateway_port=8000,
            timeout_seconds=21600.0
        )

def load_environment():
    dataset = vf.Environment.make_dataset([
        {"question": "Write a function to sort a list"}
    ])
    
    return MyGatewayEnv(
        dataset=dataset,
        rubric=vf.Rubric(lambda **kw: 1.0),
        use_gateway=True  # Enable gateway mode
    )

Disabling Gateway Mode

# Use standard CliAgentEnv interception instead
env = MyGatewayEnv(
    dataset=dataset,
    rubric=rubric,
    use_gateway=False  # Disable gateway, use standard interception
)

Attributes

use_gateway
bool
default:"True"
Toggle gateway mode. When True, uses server-side gateway. When False, falls through to CliAgentEnv interception.
gateway_port
int
default:"8000"
Port where the rollout gateway server is listening.

Methods

init_gateway

def init_gateway(
    self,
    gateway_port: int = 8000,
    timeout_seconds: float = 21600.0,
)
Initialize gateway resources. Call in __init__ when use_gateway=True.
gateway_port
int
default:"8000"
Port for the rollout gateway server.
timeout_seconds
float
default:"21600.0"
HTTP timeout for gateway requests (6 hours by default).
Initializes:
  • HTTP client with configured timeout
  • Tunnel management dict
  • Tunnel lock for thread-safe access
  • Tunnel monitor task reference

init_interception

def init_interception(self, *args, **kwargs)
Overrides CliAgentEnv.init_interception(). Only calls parent implementation when use_gateway=False.

register_rollout

async def register_rollout(self, state: State) -> None
Registers the rollout with the gateway server.
state
State
required
Current rollout state.
Sends to gateway:
  • Model name
  • Sampling parameters
  • Max turns
  • Max sequence length
Endpoint: POST /v1/rollouts/{rollout_id}/register

unregister_rollout

async def unregister_rollout(self, state: State) -> None
Unregisters the rollout from the gateway server.
state
State
required
Current rollout state.
Endpoint: POST /v1/rollouts/{rollout_id}/unregister

fetch_trajectory

async def fetch_trajectory(self, state: State) -> None
Fetches the trajectory from the gateway after agent completion.
state
State
required
Current rollout state. Updated with trajectory data.
Updates state with:
  • trajectory: List of conversation turns
  • prompt: Final prompt messages
  • completion: Final completion messages
  • is_truncated: Whether any turn was truncated
Endpoint: GET /v1/rollouts/{rollout_id}/trajectory

build_env_vars

async def build_env_vars(self, state: State) -> dict[str, str]
Override to set OPENAI_BASE_URL from rollout_base_url in gateway mode.
state
State
required
Current rollout state.
Returns: Environment variables dict with:
  • OPENAI_BASE_URL: Points to gateway rollout endpoint
  • OPENAI_MODEL: Model name from state
  • OPENAI_TIMEOUT: Set to “600”
  • OPENAI_REQUEST_TIMEOUT: Set to “600”
  • HTTPX_TIMEOUT: Set to “600”
  • Plus any variables from self.environment_vars

get_gateway_tunnel_url

async def get_gateway_tunnel_url(self, local_addr: str | None = None) -> str
Get or create a Prime Tunnel for the gateway connection. Automatically restarts dead tunnels.
local_addr
str | None
default:"None"
Local address for the tunnel. Required when starting first tunnel or when multiple tunnels are active.
Returns: Tunnel URL (e.g., "https://xxx.prime-tunnel.com"). Behavior:
  • Creates new tunnel if none exists for local_addr
  • Reuses existing tunnel if alive
  • Restarts dead tunnels automatically
  • Starts health monitor on first tunnel creation

start_agent

async def start_agent(self, state: State) -> None
Starts the agent command as a background job. In gateway mode, skips the background completion monitoring task (handled by wait_for_agent_completion).
state
State
required
Current rollout state.
Updates state:
  • background_job: Background job handle
  • agent_start_time: Start timestamp
  • agent_completed: Set to False

poll_job_completion

async def poll_job_completion(
    self,
    state: State,
    sandbox_id: str,
    background_job,
) -> None
Polls until background job completes, capturing output and monitoring tunnel health.
state
State
required
Current rollout state.
sandbox_id
str
required
Prime Sandbox ID.
background_job
required
Background job handle from sandbox client.
Updates state on completion:
  • agent_exit_code: Process exit code
  • agent_stdout: Captured stdout
  • agent_stderr: Captured stderr
Raises:
  • TunnelError if tunnel dies during polling

wait_for_agent_completion

async def wait_for_agent_completion(self, state: State) -> None
Waits for agent completion with timeout.
state
State
required
Current rollout state.
Updates state:
  • agent_completed: Set to True when done
  • agent_timed_out: Set to True if timeout exceeded

rollout

async def rollout(
    self,
    input: RolloutInput,
    client: Client,
    model: str,
    sampling_args: SamplingArgs | None = None,
) -> State
Main rollout method. When use_gateway=True, orchestrates gateway-based rollout. Otherwise, delegates to parent CliAgentEnv.rollout().
input
RolloutInput
required
Rollout input data.
client
Client
required
LLM client (base URL used to determine gateway URL).
model
str
required
Model identifier.
sampling_args
SamplingArgs | None
default:"None"
Sampling parameters.
Returns: Final rollout state. Gateway mode flow:
  1. Initialize state
  2. Register rollout with gateway
  3. Resolve tunnel local address
  4. Start or reuse Prime Tunnel
  5. Create sandbox with OPENAI_BASE_URL pointing to gateway
  6. Start agent
  7. Wait for agent completion
  8. Fetch trajectory from gateway
  9. Cleanup (unregister, destroy sandbox)

teardown_gateway

@vf.teardown
async def teardown_gateway()
Teardown hook that closes HTTP client, stops tunnels, and cancels health monitor. Decorated with @vf.teardown to run automatically. Cleans up:
  • HTTP client connection
  • All active Prime Tunnels
  • Tunnel health monitor task

State Keys

Gateway mode adds these state keys:
rollout_id
str
Unique identifier for the rollout (format: "rollout_{uuid}").
gateway_url
str
Base URL of the gateway server (derived from client base URL).
rollout_base_url
str
Full rollout endpoint URL: {tunnel_url}/v1/rollouts/{rollout_id}.
tunnel_url
str
Prime Tunnel URL.
tunnel_local_addr
str
Local address for tunnel connection.
tunnel_id
str | None
Prime Tunnel ID for debugging.
Plus all state keys from CliAgentEnv:
sandbox_id
str
Prime Sandbox ID.
background_job
Background job handle.
agent_completed
bool
Whether agent process finished.
agent_exit_code
int
Agent process exit code.
agent_stdout
str
Captured stdout.
agent_stderr
str
Captured stderr.
agent_timed_out
bool
Whether agent exceeded timeout.

Tunnel Health Monitoring

The mixin automatically monitors tunnel health in the background:
async def _tunnel_health_monitor(self, interval: float = 30.0) -> None
  • Runs every 30 seconds by default
  • Detects dead tunnels via tunnel.is_running
  • Automatically restarts dead tunnels
  • Logs frpc output for debugging
  • Started lazily on first tunnel creation
  • Cancelled on teardown

Error Handling

Tunnel Errors

try:
    state = await env.rollout(...)
except vf.TunnelError as e:
    print(f"Tunnel failed: {e}")
    # Contains frpc output for debugging

Gateway Errors

# HTTP errors from gateway are raised via httpx
try:
    state = await env.rollout(...)
except httpx.HTTPStatusError as e:
    print(f"Gateway returned {e.response.status_code}")

Cleanup Guarantees

The mixin ensures cleanup even on errors:
  • Unregister rollout (if registered)
  • Destroy sandbox (if created)
  • Errors during cleanup are logged but don’t raise
  • Any cleanup error is captured in state["error"]

Logging

The mixin provides detailed structured logging:
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("verifiers.envs.experimental.rollout_gateway_mixin")
logger.setLevel(logging.DEBUG)
Log stages:
  • stage=start: Rollout initiated
  • stage=register_rollout: Gateway registration
  • stage=resolve_tunnel_local_addr: Tunnel address resolution
  • stage=start_tunnel: Tunnel creation
  • stage=create_sandbox: Sandbox provisioning
  • stage=start_agent: Agent launch
  • stage=wait_for_agent_completion: Agent monitoring
  • stage=fetch_trajectory: Trajectory retrieval
  • stage=tunnel_died: Tunnel failure
  • stage=agent_completed: Agent exit
  • stage=finish: Rollout completion

Advanced Example

import verifiers as vf
from verifiers.envs.experimental.rollout_gateway_mixin import RolloutGatewayMixin
from verifiers.envs.experimental.cli_agent_env import CliAgentEnv
import logging

logging.basicConfig(level=logging.INFO)

class HarborGatewayEnv(RolloutGatewayMixin, CliAgentEnv):
    def __init__(self, **kwargs):
        super().__init__(
            run_command="python /app/solve.py",
            docker_image="python:3.11",
            timeout_seconds=7200,  # 2 hours
            max_turns=100,
            **kwargs
        )
        self.init_gateway(
            gateway_port=8000,
            timeout_seconds=7200.0
        )
    
    async def post_sandbox_setup(self, state: vf.State) -> None:
        """Upload agent code after sandbox creation."""
        sandbox_id = state["sandbox_id"]
        
        # Upload solver
        await self.sandbox_client.upload_file(
            sandbox_id,
            "/app/solve.py",
            "./agents/solver.py"
        )
        
        # Install dependencies
        await self.sandbox_client.execute_command(
            sandbox_id,
            "pip install openai numpy",
            working_dir="/app"
        )
    
    async def build_env_vars(self, state: vf.State) -> dict[str, str]:
        """Add custom environment variables."""
        env_vars = await super().build_env_vars(state)
        
        # Pass task metadata to agent
        info = state.get("info", {})
        if "instance_id" in info:
            env_vars["HARBOR_INSTANCE_ID"] = info["instance_id"]
        if "repo_name" in info:
            env_vars["HARBOR_REPO"] = info["repo_name"]
        
        return env_vars

def load_environment():
    dataset = vf.Environment.make_dataset([
        {
            "question": "Fix the bug in test_api.py",
            "info": {
                "instance_id": "repo-123",
                "repo_name": "test-repo"
            }
        }
    ])
    
    def success_rate(completion: vf.Messages, info: dict, **kwargs) -> float:
        # Reward function using agent output
        return 1.0 if len(completion) > 0 else 0.0
    
    return HarborGatewayEnv(
        dataset=dataset,
        rubric=vf.Rubric(success_rate),
        use_gateway=True
    )

When to Use Gateway Mode

Use gateway when:
  • Running distributed rollouts with prime-rl’s gateway server
  • Need server-side trajectory management
  • Want centralized rollout coordination
  • Prefer gateway-managed model inference
Use standard interception when:
  • Running local rollouts without gateway infrastructure
  • Need client-side interception for debugging
  • Want simpler setup without gateway dependencies

See Also

Build docs developers (and LLMs) love