Overview
Headless mode allows Hive agents to run autonomously in production without requiring terminal UI (TUI) interaction. This is essential for:
Background jobs and scheduled tasks
Webhook handlers and API endpoints
CI/CD pipelines and automated testing
Long-running daemon processes
Running Headless
Direct Runtime Execution
Create and manage the runtime directly:
import asyncio
from pathlib import Path
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig
from framework.runtime.execution_stream import EntryPointSpec
async def main ():
# Create runtime configuration
config = AgentRuntimeConfig(
max_concurrent_executions = 100 ,
cache_ttl = 60.0 ,
execution_result_max = 1000 ,
)
# Initialize runtime
runtime = AgentRuntime(
graph = my_graph,
goal = my_goal,
storage_path = Path( "./storage" ),
llm = llm_provider,
tools = tool_list,
tool_executor = tool_executor,
config = config,
)
# Register entry points
runtime.register_entry_point(EntryPointSpec(
id = "api" ,
name = "API Handler" ,
entry_node = "process-request" ,
trigger_type = "api" ,
isolation_level = "shared" ,
max_concurrent = 10 ,
))
# Start runtime
await runtime.start()
# Trigger execution (non-blocking)
exec_id = await runtime.trigger( "api" , { "query" : "help" })
# Wait for completion if needed
result = await runtime.wait_for_completion(exec_id, timeout = 300.0 )
if result and result.success:
print ( f "Success: { result.output } " )
else :
print ( f "Failed: { result.error if result else 'timeout' } " )
# Cleanup
await runtime.stop()
if __name__ == "__main__" :
asyncio.run(main())
Async Entry Points
Headless execution shines with event-driven and scheduled triggers.
Timer-Based Execution
Run agents on a fixed schedule:
from framework.runtime.execution_stream import EntryPointSpec
# Fixed interval (every 5 minutes)
timer_entry = EntryPointSpec(
id = "health_check" ,
name = "System Health Check" ,
entry_node = "check-health" ,
trigger_type = "timer" ,
trigger_config = {
"interval_minutes" : 5 ,
"run_immediately" : True , # Fire on startup
"idle_timeout_seconds" : 300 , # Skip if agent is busy
},
isolation_level = "isolated" ,
)
runtime.register_entry_point(timer_entry)
Or use cron expressions for complex schedules:
# Cron expression (business hours, weekdays only)
cron_entry = EntryPointSpec(
id = "report_generator" ,
name = "Daily Report" ,
entry_node = "generate-report" ,
trigger_type = "timer" ,
trigger_config = {
"cron" : "0 9 * * 1-5" , # 9 AM, Mon-Fri
"run_immediately" : False ,
"idle_timeout_seconds" : 300 ,
},
isolation_level = "isolated" ,
)
runtime.register_entry_point(cron_entry)
Timer-based entry points automatically skip execution if the agent is actively working (agent_idle_seconds < idle_timeout). This prevents queue buildup.
Event-Driven Execution
React to events from other entry points or graphs:
from framework.runtime.event_bus import EventType
# React to failures in primary graph
guardian_entry = EntryPointSpec(
id = "guardian" ,
name = "Failure Guardian" ,
entry_node = "analyze-failure" ,
trigger_type = "event" ,
trigger_config = {
"event_types" : [EventType. EXECUTION_FAILED ],
"exclude_own_graph" : True , # Only watch other graphs
},
isolation_level = "isolated" ,
)
runtime.register_entry_point(guardian_entry)
Event filters:
# Filter by stream
EntryPointSpec(
trigger_type = "event" ,
trigger_config = {
"event_types" : [EventType. NODE_STALLED ],
"filter_stream" : "primary" , # Only from this stream
},
)
# Filter by node
EntryPointSpec(
trigger_type = "event" ,
trigger_config = {
"event_types" : [EventType. TOOL_CALL_COMPLETED ],
"filter_node" : "critical-operation" , # Only this node
},
)
# Filter by graph
EntryPointSpec(
trigger_type = "event" ,
trigger_config = {
"event_types" : [EventType. EXECUTION_FAILED ],
"filter_graph" : "worker" , # Only from worker graph
},
)
Webhook Entry Points
Receive HTTP webhooks:
from framework.runtime.agent_runtime import AgentRuntimeConfig
# Configure webhook server
config = AgentRuntimeConfig(
webhook_host = "0.0.0.0" ,
webhook_port = 8080 ,
webhook_routes = [
{
"source_id" : "github" ,
"path" : "/webhooks/github" ,
"methods" : [ "POST" ],
"secret" : "your-webhook-secret" , # Optional HMAC validation
},
{
"source_id" : "slack" ,
"path" : "/webhooks/slack" ,
"methods" : [ "POST" ],
},
],
)
runtime = AgentRuntime( ... , config = config)
# Register webhook handler
webhook_entry = EntryPointSpec(
id = "github_handler" ,
name = "GitHub Webhook Handler" ,
entry_node = "process-github-event" ,
trigger_type = "event" , # Listens to WEBHOOK_RECEIVED events
trigger_config = {
"event_types" : [EventType. WEBHOOK_RECEIVED ],
"filter_stream" : "github" ,
},
isolation_level = "isolated" ,
)
runtime.register_entry_point(webhook_entry)
Concurrency Control
Per-Entry-Point Limits
EntryPointSpec(
id = "api" ,
entry_node = "handle-request" ,
max_concurrent = 50 , # Max 50 parallel executions
)
Global Limits
AgentRuntimeConfig(
max_concurrent_executions = 100 , # Global limit across all entry points
)
Single Execution Mode
For entry points that should never run concurrently:
EntryPointSpec(
id = "singleton" ,
entry_node = "exclusive-task" ,
max_concurrent = 1 , # Only one execution at a time
)
If a new execution is triggered while one is already running, the old execution is cancelled and the new one starts in the same session directory.
Lifecycle Management
Graceful Shutdown
import signal
import asyncio
runtime = None
async def shutdown ( sig ):
print ( f "Received { sig.name } , shutting down..." )
if runtime:
await runtime.stop()
asyncio.get_event_loop().stop()
async def main ():
global runtime
# Setup signal handlers
loop = asyncio.get_event_loop()
for sig in (signal. SIGTERM , signal. SIGINT ):
loop.add_signal_handler(
sig,
lambda s = sig: asyncio.create_task(shutdown(s)),
)
runtime = AgentRuntime( ... )
await runtime.start()
# Keep running until shutdown
while runtime._running:
await asyncio.sleep( 1 )
if __name__ == "__main__" :
asyncio.run(main())
Timer Pause/Resume
Pause timer-driven entry points without stopping the runtime:
# Pause all timers
runtime.pause_timers()
# Resume timers
runtime.resume_timers()
Useful for:
Maintenance windows
Rate limit backoff
User-controlled pause
Logging Configuration
Configure structured logging for headless environments:
from framework.observability import configure_logging
# JSON logs for production
configure_logging( level = "INFO" , format = "json" )
# Human-readable for development
configure_logging( level = "DEBUG" , format = "human" )
# Auto-detect from environment
configure_logging( level = "INFO" , format = "auto" )
# Uses JSON if LOG_FORMAT=json or ENV=production
Process Management
systemd Service
[Unit]
Description =Hive Agent Service
After =network.target
[Service]
Type =simple
User =hive
WorkingDirectory =/opt/hive-agent
ExecStart =/opt/hive-agent/venv/bin/python main.py
Restart =always
RestartSec =10
Environment = "LOG_FORMAT=json"
Environment = "ENV=production"
[Install]
WantedBy =multi-user.target
Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV LOG_FORMAT=json
ENV ENV=production
CMD [ "python" , "main.py" ]
Monitoring Headless Agents
Track runtime health:
# Get runtime statistics
stats = runtime.get_stats()
print ( f "Active executions: { stats[ 'active_executions' ] } " )
print ( f "Streams: { stats[ 'streams' ] } " )
# Check stream health
for stream_id, stream in runtime._streams.items():
stream_stats = stream.get_stats()
print ( f "Stream { stream_id } :" )
print ( f " Running: { stream_stats[ 'status_counts' ].get( 'running' , 0 ) } " )
print ( f " Idle seconds: { stream.agent_idle_seconds :.1f} " )
Best Practices
Error Handling Always wrap runtime operations in try-except blocks. Log errors with structured context.
Resource Limits Set appropriate max_concurrent limits based on available memory and CPU.
Session Cleanup Periodically clean up old completed sessions to prevent disk bloat.
Observability Use structured logging and EventBus to export metrics to monitoring systems.