Skip to main content

Overview

The Host Agent runs as a system service on Linux or Windows hosts, managing the lifecycle of an OpenTelemetry Collector instance. It provides automated configuration management, remote updates, and health monitoring without requiring direct SSH access to the host.

Architecture Components

The host agent architecture consists of three main layers: Host Agent Lifecycle

Service Layer

The agent runs as a native system service using the kardianos/service library, which provides cross-platform service management:
systemctl status kmagent
systemctl start kmagent
systemctl stop kmagent
The service is registered as kmagent.service and runs with appropriate user permissions.
cmd/kmagent/main.go
svcConfig := &service.Config{
    Name:        "kmagent",
    DisplayName: "KloudMate Agent",
    Description: "KloudMate Agent for OpenTelemetry auto instrumentation",
}
svc, err := service.New(p, svcConfig)

Agent Layer

The agent layer manages the collector lifecycle and configuration updates:
func New(cfg *config.Config, logger *zap.SugaredLogger, opts ...Option) (*Agent, error) {
    configUpdater := updater.NewConfigUpdater(cfg, logger)
    a := Agent{
        cfg:            cfg,
        logger:         logger,
        updater:        configUpdater,
        shutdownSignal: make(chan struct{}),
    }
    return &a, nil
}

Collector Layer

The OpenTelemetry Collector is created using the factory pattern with platform-specific components:
internal/agent/collector.go
func NewCollector(c *config.Config) (*otelcol.Collector, error) {
    collectorSettings := shared.CollectorInfoFactory(c.OtelConfigPath)
    return otelcol.NewCollector(collectorSettings)
}
internal/shared/collector_info.go
func CollectorInfoFactory(cfgPath string) otelcol.CollectorSettings {
    info := component.BuildInfo{
        Command:     "kmagent",
        Description: "KloudMate Agent for OpenTelemetry",
        Version:     version.GetCollectorVersion(),
    }
    
    return otelcol.CollectorSettings{
        BuildInfo:               info,
        Factories:               Components,
        DisableGracefulShutdown: true,
        ConfigProviderSettings: otelcol.ConfigProviderSettings{
            ResolverSettings: confmap.ResolverSettings{
                DefaultScheme: "env",
                URIs:          []string{cfgPath},
                ProviderFactories: []confmap.ProviderFactory{
                    envprovider.NewFactory(),
                    fileprovider.NewFactory(),
                    yamlprovider.NewFactory(),
                },
            },
        },
        SkipSettingGRPCLogger: true,
    }
}

Lifecycle Management

The agent manages the collector through distinct lifecycle phases:
1

Initialization

The Program structure is initialized with configuration from CLI flags, environment variables, and config files.
cmd/kmagent/main.go
func (p *Program) Initialize(c *cli.Context) error {
    p.ctx, p.cancelFunc = context.WithCancel(context.Background())
    
    // Load configuration
    err = p.cfg.LoadConfig()
    if err != nil {
        return fmt.Errorf("failed to load configuration: %v", err)
    }
    
    // Create agent
    p.kmAgent, err = agent.New(p.cfg, p.logger, agent.WithVersion(p.version))
    if err != nil {
        return fmt.Errorf("failed to create agent: %v", err)
    }
    
    return nil
}
2

Service Start

The service layer calls the Start method, which launches two goroutines:
  • Collector lifecycle manager
  • Configuration update checker
cmd/kmagent/main.go
func (p *Program) Start(s service.Service) error {
    p.logger.Info("Starting service")
    p.wg.Add(1)
    go p.run()
    return nil
}
3

Collector Creation

The agent creates a new collector instance with the current configuration.
internal/agent/agent.go
func (a *Agent) manageCollectorLifecycle(ctx context.Context) error {
    collector, err := NewCollector(a.cfg)
    if err != nil {
        return fmt.Errorf("failed to create new collector instance: %w", err)
    }
    
    a.collectorMu.Lock()
    if !a.isRunning.Load() {
        a.collectorMu.Unlock()
        return nil
    }
    a.collector = collector
    a.collectorMu.Unlock()
    
    runErr := collector.Run(ctx)
    return runErr
}
4

Configuration Watching

A ticker periodically checks for configuration updates from the remote API.
internal/agent/agent.go
func (a *Agent) runConfigUpdateChecker(ctx context.Context) {
    ticker := time.NewTicker(time.Duration(a.cfg.ConfigCheckInterval) * time.Second)
    defer ticker.Stop()
    
    // Trigger the very first config check
    if err := a.performConfigCheck(ctx); err != nil {
        a.logger.Errorf("Periodic config check failed: %v", err)
    }
    
    for {
        select {
        case <-ticker.C:
            if err := a.performConfigCheck(ctx); err != nil {
                a.logger.Errorf("Periodic config check failed: %v", err)
            }
        case <-a.shutdownSignal:
            return
        case <-ctx.Done():
            return
        }
    }
}
5

Graceful Shutdown

When the service stops, the agent gracefully shuts down the collector and waits for all goroutines to complete.
cmd/kmagent/main.go
func (p *Program) Stop(s service.Service) error {
    if p.cancelFunc != nil {
        p.cancelFunc()
    }
    
    if p.kmAgent != nil {
        shutdownCtx, shutdownCancel := context.WithTimeout(
            context.Background(), 30*time.Second)
        defer shutdownCancel()
        
        if err := p.kmAgent.Shutdown(shutdownCtx); err != nil {
            p.logger.Errorf("Error during agent shutdown: %v", err)
        }
    }
    
    p.wg.Wait()
    return nil
}

Configuration Update Flow

The agent implements a sophisticated configuration update mechanism:
1

Status Collection

The agent collects current status information about itself and the collector.
internal/agent/agent.go
a.collectorMu.Lock()
params := updater.UpdateCheckerParams{
    Version: a.version,
}
if a.collector != nil {
    params.CollectorStatus = "Running"
} else {
    params.CollectorStatus = "Stopped"
    params.CollectorLastError = a.collectorError
}
a.collectorMu.Unlock()

if a.isRunning.Load() {
    params.AgentStatus = "Running"
} else {
    params.AgentStatus = "Stopped"
}
2

Remote API Call

The updater sends a POST request to the remote API with status information.
internal/updater/updater.go
data := map[string]interface{}{
    "is_docker":          u.cfg.DockerMode,
    "hostname":           u.cfg.Hostname(),
    "platform":           platform,
    "architecture":       runtime.GOARCH,
    "agent_version":      p.Version,
    "collector_version":  version.GetCollectorVersion(),
    "agent_status":       p.AgentStatus,
    "collector_status":   p.CollectorStatus,
    "last_error_message": p.CollectorLastError,
}

req, err := http.NewRequestWithContext(
    reqCtx, "POST", u.cfg.ConfigUpdateURL, bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", u.cfg.APIKey)
3

Response Processing

The API returns whether a restart is required and the new configuration.
internal/updater/updater.go
type ConfigUpdateResponse struct {
    RestartRequired bool                   `json:"restart_required"`
    Config          map[string]interface{} `json:"config"`
}

var updateResp ConfigUpdateResponse
if err := json.NewDecoder(resp.Body).Decode(&updateResp); err != nil {
    return false, nil, fmt.Errorf("failed to decode: %w", err)
}

return updateResp.RestartRequired, updateResp.Config, nil
4

Atomic Update

If a restart is required, the agent writes the new configuration atomically.
internal/agent/agent.go
func (a *Agent) UpdateConfig(_ context.Context, newConfig map[string]interface{}) error {
    configYAML, err := yaml.Marshal(newConfig)
    if err != nil {
        return fmt.Errorf("failed to marshal new config: %w", err)
    }
    
    tempFile := a.cfg.OtelConfigPath + ".new"
    if err := os.WriteFile(tempFile, configYAML, 0644); err != nil {
        return fmt.Errorf("failed to write new config: %w", err)
    }
    
    if err := os.Rename(tempFile, a.cfg.OtelConfigPath); err != nil {
        return fmt.Errorf("failed to replace config file: %w", err)
    }
    
    return nil
}
5

Collector Restart

The agent stops the current collector and starts a new instance with the updated configuration.
internal/agent/agent.go
if newConfig != nil && restart {
    if err := a.UpdateConfig(ctx, newConfig); err != nil {
        return fmt.Errorf("failed to update config file: %w", err)
    }
    
    a.stopCollectorInstance()
    
    a.wg.Add(1)
    go func() {
        defer a.wg.Done()
        if err := a.manageCollectorLifecycle(agentCtx); err != nil {
            a.collectorError = err.Error()
        } else {
            a.collectorError = ""
        }
    }()
}

Thread Safety

The agent uses multiple synchronization primitives to ensure thread-safe operation:
// Agent running state
isRunning atomic.Bool

// Thread-safe compare-and-swap
if !a.isRunning.CompareAndSwap(false, true) {
    return fmt.Errorf("agent already running")
}

Docker Mode

When running in Docker mode, the agent enables specialized configuration:
internal/config/config.go
type Config struct {
    DockerMode     bool
    DockerEndpoint string
    // ... other fields
}

func GetDockerConfigPath() string {
    return "/etc/kmagent/config.yaml"
}
The Docker agent uses volume mounts to access host resources:
docker run -d \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -v /var/log:/var/log:ro \
  -v /proc:/host/proc:ro \
  -v /sys:/host/sys:ro \
  -e KM_API_KEY="your-api-key" \
  -e KM_DOCKER_MODE=true \
  kloudmate/km-agent

Configuration Paths

The agent uses platform-specific configuration paths:
internal/config/config.go
func GetDefaultConfigPath() string {
    if runtime.GOOS == "windows" {
        execPath, _ := os.Executable()
        return filepath.Join(filepath.Dir(execPath), "config.yaml")
    } else if runtime.GOOS == "darwin" {
        return "/Library/Application Support/kmagent/config.yaml"
    } else {
        // Linux/Unix
        return "/etc/kmagent/config.yaml"
    }
}
The configuration directory is automatically created if it doesn’t exist, with appropriate permissions (0755).

Logging

The host agent uses structured logging with rotation:
cmd/kmagent/main.go
func getFileLogger(logFile string) (*zap.SugaredLogger, error) {
    writer := zapcore.AddSync(&lumberjack.Logger{
        Filename:   logFile,
        MaxSize:    10, // MB
        MaxBackups: 1,
        MaxAge:     7,    // days
        Compress:   true,
    })
    
    core := zapcore.NewTee(
        zapcore.NewCore(encoder, writer, zapcore.InfoLevel),
        zapcore.NewCore(zapcore.NewConsoleEncoder(encoderConfig), 
            zapcore.AddSync(os.Stdout), zapcore.DebugLevel),
    )
    
    logger := zap.New(core)
    return logger.Sugar(), nil
}

Error Handling

The agent implements comprehensive error handling:
If the collector fails to start, the error is captured and reported to the remote API on the next status check.
internal/agent/agent.go
runErr := collector.Run(ctx)
if runErr != nil {
    a.collectorError = runErr.Error()
    a.logger.Errorw("collector run loop exited with error", "error", runErr)
}
Failed configuration updates are logged but don’t crash the agent. The collector continues with the previous configuration.
internal/agent/agent.go
if err := a.UpdateConfig(ctx, newConfig); err != nil {
    a.collectorError = err.Error()
    return fmt.Errorf("failed to update config file: %w", err)
}
Network errors during configuration checks are logged but don’t prevent future checks.
internal/updater/updater.go
if respErr != nil {
    return false, nil, fmt.Errorf("failed to fetch config updates: %w", respErr)
}

Next Steps

Collector Lifecycle

Deep dive into collector lifecycle management and restart mechanisms

Kubernetes Agent

Learn about the Kubernetes deployment architecture

Configuration

Configure the host agent for your environment

Deployment

Deploy the host agent on Linux systems

Build docs developers (and LLMs) love