Installation
Quick Start
import asyncio
from opensandbox.sandbox import Sandbox
from opensandbox.config import ConnectionConfig
from opensandbox.exceptions import SandboxException
async def main ():
# 1. Configure connection
config = ConnectionConfig(
domain = "api.opensandbox.io" ,
api_key = "your-api-key"
)
# 2. Create a Sandbox
try :
sandbox = await Sandbox.create(
"ubuntu" ,
connection_config = config
)
async with sandbox:
# 3. Execute a shell command
execution = await sandbox.commands.run( "echo 'Hello Sandbox!'" )
# 4. Print output
print (execution.logs.stdout[ 0 ].text)
# 5. Cleanup
await sandbox.kill()
except SandboxException as e:
print ( f "Sandbox Error: [ { e.error.code } ] { e.error.message } " )
except Exception as e:
print ( f "Error: { e } " )
if __name__ == "__main__" :
asyncio.run(main())
from datetime import timedelta
import httpx
from opensandbox import SandboxSync
from opensandbox.config import ConnectionConfigSync
config = ConnectionConfigSync(
domain = "api.opensandbox.io" ,
api_key = "your-api-key" ,
request_timeout = timedelta( seconds = 30 ),
transport = httpx.HTTPTransport( limits = httpx.Limits( max_connections = 20 )),
)
sandbox = SandboxSync.create( "ubuntu" , connection_config = config)
with sandbox:
execution = sandbox.commands.run( "echo 'Hello Sandbox!'" )
print (execution.logs.stdout[ 0 ].text)
sandbox.kill()
Core Features
Lifecycle Management
Manage the complete sandbox lifecycle with renewal, pausing, and resuming capabilities.
from datetime import timedelta
# Renew the sandbox
# This resets the expiration time to (current time + duration)
await sandbox.renew(timedelta( minutes = 30 ))
# Pause execution (suspends all processes)
await sandbox.pause()
# Resume execution
sandbox = await Sandbox.resume(
sandbox_id = sandbox.id,
connection_config = config,
)
# Get current status
info = await sandbox.get_info()
print ( f "State: { info.status.state } " )
Custom Health Check
Define custom logic to determine if the sandbox is healthy. This overrides the default ping check.
async def custom_health_check ( sbx : Sandbox) -> bool :
try :
# 1. Get the external mapped address for port 80
endpoint = await sbx.get_endpoint( 80 )
# 2. Perform your connection check (e.g. HTTP request, Socket connect)
# return await check_connection(endpoint.endpoint)
return True
except Exception :
return False
sandbox = await Sandbox.create(
"nginx:latest" ,
connection_config = config,
health_check = custom_health_check # Custom check: Wait for port 80 to be accessible
)
Command Execution & Streaming
Execute commands and handle output streams in real-time.
from opensandbox.models.execd import ExecutionHandlers, RunCommandOpts
# Define async handlers for streaming output
async def handle_stdout ( msg ):
print ( f "STDOUT: { msg.text } " )
async def handle_stderr ( msg ):
print ( f "STDERR: { msg.text } " )
async def handle_complete ( complete ):
print ( f "Command finished in { complete.execution_time_in_millis } ms" )
# Create handlers (all handlers must be async)
handlers = ExecutionHandlers(
on_stdout = handle_stdout,
on_stderr = handle_stderr,
on_execution_complete = handle_complete
)
# Execute command with handlers
result = await sandbox.commands.run(
"for i in {1..5}; do echo \" Count $i \" ; sleep 0.5; done" ,
handlers = handlers
)
File Operations
Comprehensive file and directory management including read, write, search, and delete.
from opensandbox.models.filesystem import WriteEntry, SearchEntry
# 1. Write file
await sandbox.files.write_files([
WriteEntry(
path = "/tmp/hello.txt" ,
data = "Hello World" ,
mode = 644
)
])
# 2. Read file
content = await sandbox.files.read_file( "/tmp/hello.txt" )
print ( f "Content: { content } " )
# 3. List/Search files
files = await sandbox.files.search(
SearchEntry(
path = "/tmp" ,
pattern = "*.txt"
)
)
for f in files:
print ( f "Found: { f.path } " )
# 4. Delete file
await sandbox.files.delete_files([ "/tmp/hello.txt" ])
Sandbox Management (Admin)
Use SandboxManager for administrative tasks and finding existing sandboxes.
from opensandbox.manager import SandboxManager
from opensandbox.models.sandboxes import SandboxFilter
# Create manager using async context manager
async with await SandboxManager.create( connection_config = config) as manager:
# List running sandboxes
sandboxes = await manager.list_sandbox_infos(
SandboxFilter(
states = [ "RUNNING" ],
page_size = 10
)
)
for info in sandboxes.sandbox_infos:
print ( f "Found sandbox: { info.id } " )
# Perform admin actions
await manager.kill_sandbox(info.id)
Configuration
Connection Configuration
The ConnectionConfig class manages API server connection settings.
Parameter Description Default Environment Variable api_keyAPI Key for authentication Required OPEN_SANDBOX_API_KEYdomainThe endpoint domain of the sandbox service Required (or localhost:8080) OPEN_SANDBOX_DOMAINprotocolHTTP protocol (http/https) http- request_timeoutTimeout for API requests 30 seconds - debugEnable debug logging for HTTP requests False- headersCustom HTTP headers Empty - transportShared httpx transport (pool/proxy/retry) SDK-created per instance - use_server_proxyUse sandbox server as proxy for execd/endpoint requests False-
from datetime import timedelta
config = ConnectionConfig(
api_key = "your-key" ,
domain = "api.opensandbox.io" ,
request_timeout = timedelta( seconds = 60 )
)
from datetime import timedelta
import httpx
# Custom headers and custom transport
# If you create many Sandbox instances, configuring a shared transport
# is recommended to optimize resource usage.
config = ConnectionConfig(
api_key = "your-key" ,
domain = "api.opensandbox.io" ,
headers = { "X-Custom-Header" : "value" },
transport = httpx.AsyncHTTPTransport(
limits = httpx.Limits(
max_connections = 100 ,
max_keepalive_connections = 50 ,
keepalive_expiry = 30.0 ,
)
),
)
# If you provide a custom transport, you are responsible for closing it:
# await config.transport.aclose()
Sandbox Creation Configuration
The Sandbox.create() method allows configuring the sandbox environment.
Parameter Description Default imageDocker image specification Required timeoutAutomatic termination timeout 10 minutes entrypointContainer entrypoint command ["tail", "-f", "/dev/null"]resourceCPU and memory limits {"cpu": "1", "memory": "2Gi"}envEnvironment variables Empty metadataCustom metadata tags Empty network_policyOptional outbound network policy (egress) - ready_timeoutMax time to wait for sandbox to be ready 30 seconds
from datetime import timedelta
from opensandbox.models.sandboxes import NetworkPolicy, NetworkRule
sandbox = await Sandbox.create(
"python:3.11" ,
connection_config = config,
timeout = timedelta( minutes = 30 ),
resource = { "cpu" : "2" , "memory" : "4Gi" },
env = { "PYTHONPATH" : "/app" },
metadata = { "project" : "demo" },
network_policy = NetworkPolicy(
defaultAction = "deny" ,
egress = [NetworkRule( action = "allow" , target = "pypi.org" )],
),
)
API Reference
Sandbox
Main class for interacting with sandboxes.
Methods
Creates and initializes a new sandbox instance. Parameters:
image (str): Docker image to use
connection_config (ConnectionConfig): Connection configuration
timeout (timedelta, optional): Sandbox lifetime
entrypoint (List[str], optional): Custom entrypoint
resource (Dict[str, str], optional): CPU/memory limits
env (Dict[str, str], optional): Environment variables
metadata (Dict[str, str], optional): Custom metadata
network_policy (NetworkPolicy, optional): Network egress rules
health_check (Callable, optional): Custom health check function
ready_timeout (timedelta, optional): Readiness timeout
Returns: Sandbox instance
Resumes a paused sandbox. Parameters:
sandbox_id (str): ID of the paused sandbox
connection_config (ConnectionConfig): Connection configuration
Returns: Sandbox instance
Access to command execution operations. Returns: CommandsClient
Access to file system operations. Returns: FilesClient
Extends the sandbox expiration time. Parameters:
duration (timedelta): Extension duration
Pauses the sandbox (suspends all processes).
Terminates the sandbox immediately.
Retrieves current sandbox information. Returns: SandboxInfo
Gets the external endpoint for a port. Parameters:
port (int): Internal port number
Returns: SandboxEndpoint
Examples
Running a Python Script
from opensandbox.models.filesystem import WriteEntry
# Write a Python script
await sandbox.files.write_files([
WriteEntry(
path = "/app/script.py" ,
data = """
import sys
print(f"Python {sys.version} ")
print("Hello from OpenSandbox!")
""" ,
mode = 755
)
])
# Execute it
execution = await sandbox.commands.run( "python /app/script.py" )
for line in execution.logs.stdout:
print (line.text)
Network Policy Example
from opensandbox.models.sandboxes import NetworkPolicy, NetworkRule
# Create sandbox with restricted network access
sandbox = await Sandbox.create(
"python:3.11" ,
connection_config = config,
network_policy = NetworkPolicy(
defaultAction = "deny" ,
egress = [
NetworkRule( action = "allow" , target = "pypi.org" ),
NetworkRule( action = "allow" , target = "github.com" ),
],
),
)
Port Forwarding
# Start a web server in the sandbox
await sandbox.commands.run(
"python -m http.server 8000" ,
background = True
)
# Get the external endpoint
endpoint = await sandbox.get_endpoint( 8000 )
print ( f "Access your server at: http:// { endpoint.endpoint } " )
Error Handling
from opensandbox.exceptions import (
SandboxException,
SandboxNotFoundError,
SandboxTimeoutError
)
try :
sandbox = await Sandbox.create( "ubuntu" , connection_config = config)
execution = await sandbox.commands.run( "ls -la" )
except SandboxTimeoutError as e:
print ( f "Timeout: { e.error.message } " )
except SandboxNotFoundError as e:
print ( f "Not found: { e.error.message } " )
except SandboxException as e:
print ( f "Error [ { e.error.code } ]: { e.error.message } " )
Next Steps
Code Interpreter Execute Python, Java, Go code with state persistence
API Reference Explore detailed API documentation
Examples Browse more code examples
Configuration Advanced configuration options