Skip to main content

Overview

Headless mode allows Hive agents to run autonomously in production without requiring terminal UI (TUI) interaction. This is essential for:
  • Background jobs and scheduled tasks
  • Webhook handlers and API endpoints
  • CI/CD pipelines and automated testing
  • Long-running daemon processes

Running Headless

Direct Runtime Execution

Create and manage the runtime directly:
import asyncio
from pathlib import Path
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig
from framework.runtime.execution_stream import EntryPointSpec

async def main():
    # Create runtime configuration
    config = AgentRuntimeConfig(
        max_concurrent_executions=100,
        cache_ttl=60.0,
        execution_result_max=1000,
    )
    
    # Initialize runtime
    runtime = AgentRuntime(
        graph=my_graph,
        goal=my_goal,
        storage_path=Path("./storage"),
        llm=llm_provider,
        tools=tool_list,
        tool_executor=tool_executor,
        config=config,
    )
    
    # Register entry points
    runtime.register_entry_point(EntryPointSpec(
        id="api",
        name="API Handler",
        entry_node="process-request",
        trigger_type="api",
        isolation_level="shared",
        max_concurrent=10,
    ))
    
    # Start runtime
    await runtime.start()
    
    # Trigger execution (non-blocking)
    exec_id = await runtime.trigger("api", {"query": "help"})
    
    # Wait for completion if needed
    result = await runtime.wait_for_completion(exec_id, timeout=300.0)
    
    if result and result.success:
        print(f"Success: {result.output}")
    else:
        print(f"Failed: {result.error if result else 'timeout'}")
    
    # Cleanup
    await runtime.stop()

if __name__ == "__main__":
    asyncio.run(main())

Async Entry Points

Headless execution shines with event-driven and scheduled triggers.

Timer-Based Execution

Run agents on a fixed schedule:
from framework.runtime.execution_stream import EntryPointSpec

# Fixed interval (every 5 minutes)
timer_entry = EntryPointSpec(
    id="health_check",
    name="System Health Check",
    entry_node="check-health",
    trigger_type="timer",
    trigger_config={
        "interval_minutes": 5,
        "run_immediately": True,  # Fire on startup
        "idle_timeout_seconds": 300,  # Skip if agent is busy
    },
    isolation_level="isolated",
)

runtime.register_entry_point(timer_entry)
Or use cron expressions for complex schedules:
# Cron expression (business hours, weekdays only)
cron_entry = EntryPointSpec(
    id="report_generator",
    name="Daily Report",
    entry_node="generate-report",
    trigger_type="timer",
    trigger_config={
        "cron": "0 9 * * 1-5",  # 9 AM, Mon-Fri
        "run_immediately": False,
        "idle_timeout_seconds": 300,
    },
    isolation_level="isolated",
)

runtime.register_entry_point(cron_entry)
Timer-based entry points automatically skip execution if the agent is actively working (agent_idle_seconds < idle_timeout). This prevents queue buildup.

Event-Driven Execution

React to events from other entry points or graphs:
from framework.runtime.event_bus import EventType

# React to failures in primary graph
guardian_entry = EntryPointSpec(
    id="guardian",
    name="Failure Guardian",
    entry_node="analyze-failure",
    trigger_type="event",
    trigger_config={
        "event_types": [EventType.EXECUTION_FAILED],
        "exclude_own_graph": True,  # Only watch other graphs
    },
    isolation_level="isolated",
)

runtime.register_entry_point(guardian_entry)
Event filters:
# Filter by stream
EntryPointSpec(
    trigger_type="event",
    trigger_config={
        "event_types": [EventType.NODE_STALLED],
        "filter_stream": "primary",  # Only from this stream
    },
)

# Filter by node
EntryPointSpec(
    trigger_type="event",
    trigger_config={
        "event_types": [EventType.TOOL_CALL_COMPLETED],
        "filter_node": "critical-operation",  # Only this node
    },
)

# Filter by graph
EntryPointSpec(
    trigger_type="event",
    trigger_config={
        "event_types": [EventType.EXECUTION_FAILED],
        "filter_graph": "worker",  # Only from worker graph
    },
)

Webhook Entry Points

Receive HTTP webhooks:
from framework.runtime.agent_runtime import AgentRuntimeConfig

# Configure webhook server
config = AgentRuntimeConfig(
    webhook_host="0.0.0.0",
    webhook_port=8080,
    webhook_routes=[
        {
            "source_id": "github",
            "path": "/webhooks/github",
            "methods": ["POST"],
            "secret": "your-webhook-secret",  # Optional HMAC validation
        },
        {
            "source_id": "slack",
            "path": "/webhooks/slack",
            "methods": ["POST"],
        },
    ],
)

runtime = AgentRuntime(..., config=config)

# Register webhook handler
webhook_entry = EntryPointSpec(
    id="github_handler",
    name="GitHub Webhook Handler",
    entry_node="process-github-event",
    trigger_type="event",  # Listens to WEBHOOK_RECEIVED events
    trigger_config={
        "event_types": [EventType.WEBHOOK_RECEIVED],
        "filter_stream": "github",
    },
    isolation_level="isolated",
)

runtime.register_entry_point(webhook_entry)

Concurrency Control

Per-Entry-Point Limits

EntryPointSpec(
    id="api",
    entry_node="handle-request",
    max_concurrent=50,  # Max 50 parallel executions
)

Global Limits

AgentRuntimeConfig(
    max_concurrent_executions=100,  # Global limit across all entry points
)

Single Execution Mode

For entry points that should never run concurrently:
EntryPointSpec(
    id="singleton",
    entry_node="exclusive-task",
    max_concurrent=1,  # Only one execution at a time
)
If a new execution is triggered while one is already running, the old execution is cancelled and the new one starts in the same session directory.

Lifecycle Management

Graceful Shutdown

import signal
import asyncio

runtime = None

async def shutdown(sig):
    print(f"Received {sig.name}, shutting down...")
    if runtime:
        await runtime.stop()
    asyncio.get_event_loop().stop()

async def main():
    global runtime
    
    # Setup signal handlers
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(shutdown(s)),
        )
    
    runtime = AgentRuntime(...)
    await runtime.start()
    
    # Keep running until shutdown
    while runtime._running:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())

Timer Pause/Resume

Pause timer-driven entry points without stopping the runtime:
# Pause all timers
runtime.pause_timers()

# Resume timers
runtime.resume_timers()
Useful for:
  • Maintenance windows
  • Rate limit backoff
  • User-controlled pause

Logging Configuration

Configure structured logging for headless environments:
from framework.observability import configure_logging

# JSON logs for production
configure_logging(level="INFO", format="json")

# Human-readable for development
configure_logging(level="DEBUG", format="human")

# Auto-detect from environment
configure_logging(level="INFO", format="auto")
# Uses JSON if LOG_FORMAT=json or ENV=production

Process Management

systemd Service

[Unit]
Description=Hive Agent Service
After=network.target

[Service]
Type=simple
User=hive
WorkingDirectory=/opt/hive-agent
ExecStart=/opt/hive-agent/venv/bin/python main.py
Restart=always
RestartSec=10
Environment="LOG_FORMAT=json"
Environment="ENV=production"

[Install]
WantedBy=multi-user.target

Docker

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

ENV LOG_FORMAT=json
ENV ENV=production

CMD ["python", "main.py"]

Monitoring Headless Agents

Track runtime health:
# Get runtime statistics
stats = runtime.get_stats()
print(f"Active executions: {stats['active_executions']}")
print(f"Streams: {stats['streams']}")

# Check stream health
for stream_id, stream in runtime._streams.items():
    stream_stats = stream.get_stats()
    print(f"Stream {stream_id}:")
    print(f"  Running: {stream_stats['status_counts'].get('running', 0)}")
    print(f"  Idle seconds: {stream.agent_idle_seconds:.1f}")

Best Practices

Error Handling

Always wrap runtime operations in try-except blocks. Log errors with structured context.

Resource Limits

Set appropriate max_concurrent limits based on available memory and CPU.

Session Cleanup

Periodically clean up old completed sessions to prevent disk bloat.

Observability

Use structured logging and EventBus to export metrics to monitoring systems.

Build docs developers (and LLMs) love