Skip to main content

Overview

The KloudMate Agent manages the OpenTelemetry Collector as a child process with sophisticated lifecycle orchestration. This architecture enables zero-downtime configuration updates, graceful restarts, and comprehensive error handling across all deployment modes.

Lifecycle States

The collector progresses through distinct states during its lifecycle:
1

Not Created

Initial state before the collector instance is created.
// Agent initialized, collector not yet created
a.collector == nil
a.isRunning.Load() == false
2

Creating

The collector instance is being instantiated with configuration.
internal/agent/collector.go
func NewCollector(c *config.Config) (*otelcol.Collector, error) {
    collectorSettings := shared.CollectorInfoFactory(c.OtelConfigPath)
    return otelcol.NewCollector(collectorSettings)
}
3

Running

The collector is actively processing telemetry data.
internal/agent/agent.go
a.collectorMu.Lock()
a.collector = collector
a.collectorMu.Unlock()

runErr := collector.Run(ctx)
4

Stopping

The collector is being gracefully shut down.
internal/agent/agent.go
func (a *Agent) stopCollectorInstance() {
    a.collectorMu.Lock()
    collector := a.collector
    a.collector = nil
    a.collectorMu.Unlock()
    
    if collector != nil {
        collector.Shutdown()
    }
}
5

Stopped

The collector has exited and resources are cleaned up.
// Collector reference cleared
a.collector == nil
// Error state captured if applicable
if runErr != nil {
    a.collectorError = runErr.Error()
}

Host Agent Lifecycle

The host agent implements a concurrent lifecycle management pattern:

Initial Startup

func (a *Agent) StartAgent(ctx context.Context) error {
    // Atomic state transition
    if !a.isRunning.CompareAndSwap(false, true) {
        return fmt.Errorf("agent already running")
    }
    
    setupComplete := false
    defer func() {
        if !setupComplete {
            a.isRunning.Store(false)
            a.logger.Warn("Agent startup failed, reset running state")
        }
    }()
    
    // Launch two goroutines
    a.wg.Add(2)
    
    // Collector lifecycle manager
    go func() {
        defer a.wg.Done()
        if err := a.manageCollectorLifecycle(ctx); err != nil {
            a.logger.Errorf("Initial collector run failed: %v", err)
        }
    }()
    
    // Configuration update checker
    go func() {
        defer a.wg.Done()
        a.runConfigUpdateChecker(ctx)
    }()
    
    setupComplete = true
    return nil
}

Configuration-Triggered Restart

When a configuration change is detected, the agent performs an atomic restart:
1

Detect Change

The configuration updater detects a change via the remote API.
internal/agent/agent.go
restart, newConfig, err := a.updater.CheckForUpdates(ctx, params)
if err != nil {
    return fmt.Errorf("updater.CheckForUpdates failed: %w", err)
}
2

Update Configuration File

The new configuration is written atomically using a temporary file.
internal/agent/agent.go
func (a *Agent) UpdateConfig(_ context.Context, 
    newConfig map[string]interface{}) error {
    
    configYAML, err := yaml.Marshal(newConfig)
    if err != nil {
        return fmt.Errorf("failed to marshal new config to YAML: %w", err)
    }
    
    // Write to temporary file
    tempFile := a.cfg.OtelConfigPath + ".new"
    if err := os.WriteFile(tempFile, configYAML, 0644); err != nil {
        return fmt.Errorf("failed to write new config: %w", err)
    }
    
    // Atomic rename
    if err := os.Rename(tempFile, a.cfg.OtelConfigPath); err != nil {
        return fmt.Errorf("failed to replace config file: %w", err)
    }
    
    return nil
}
3

Stop Current Collector

The running collector is gracefully stopped.
internal/agent/agent.go
func (a *Agent) stopCollectorInstance() {
    a.collectorMu.Lock()
    collector := a.collector
    a.collector = nil
    a.collectorMu.Unlock()
    
    if collector != nil {
        a.logger.Info("shutting down active collector instance")
        collector.Shutdown()
        a.logger.Info("collector shutdown complete")
    }
}
4

Start New Collector

A new collector instance is started with the updated configuration.
internal/agent/agent.go
if newConfig != nil && restart {
    if err := a.UpdateConfig(ctx, newConfig); err != nil {
        a.collectorError = err.Error()
        return fmt.Errorf("failed to update config file: %w", err)
    }
    
    a.stopCollectorInstance()
    
    a.wg.Add(1)
    go func() {
        defer a.wg.Done()
        if err := a.manageCollectorLifecycle(agentCtx); err != nil {
            a.collectorError = err.Error()
        } else {
            a.logger.Info("collector restarted successfully")
            a.collectorError = ""
        }
    }()
}

Graceful Shutdown

The agent implements a multi-phase graceful shutdown:
internal/agent/agent.go
func (a *Agent) Shutdown(ctx context.Context) error {
    // Atomic state transition
    if !a.isRunning.CompareAndSwap(true, false) {
        a.logger.Debug("shutdown called but agent is not running")
        return nil
    }
    
    // Signal all goroutines to stop
    close(a.shutdownSignal)
    
    // Stop collector
    a.logger.Info("stopping collector instance")
    a.stopCollectorInstance()
    
    // Wait for all goroutines with timeout
    waitCh := make(chan struct{})
    go func() {
        a.wg.Wait()
        close(waitCh)
    }()
    
    select {
    case <-waitCh:
        a.logger.Info("all agent goroutines completed")
    case <-ctx.Done():
        a.logger.Errorf("Agent shutdown timed out: %v", ctx.Err())
        return ctx.Err()
    }
    
    return nil
}

Kubernetes Agent Lifecycle

The Kubernetes agent has a simplified lifecycle without remote configuration updates:

Startup

internal/k8sagent/collector.go
func (a *K8sAgent) startInternalCollector() error {
    a.collectorMu.Lock()
    defer a.collectorMu.Unlock()
    
    a.Logger.Info("starting collector instance")
    
    // Get collector settings
    collectorSettings := shared.CollectorInfoFactory(a.otelConfigPath())
    
    // Filter components for deployment mode
    if a.Cfg.DeploymentMode == "DEPLOYMENT" {
        factories, err := collectorSettings.Factories()
        if err == nil {
            // Remove eBPF receiver (requires host access)
            for typeName := range factories.Receivers {
                if typeName.String() == "ebpfreceiver" {
                    delete(factories.Receivers, typeName)
                }
            }
            collectorSettings.Factories = func() (otelcol.Factories, error) {
                return factories, nil
            }
        }
    }
    
    // Create collector context
    a.collectorCtx, a.collectorCancel = context.WithCancel(context.Background())
    
    // Create collector
    collector, err := otelcol.NewCollector(collectorSettings)
    if err != nil {
        a.collectorCancel()
        return fmt.Errorf("failed to create new collector: %w", err)
    }
    a.Collector = collector
    
    // Start collector in goroutine
    a.wg.Add(1)
    go func(col *otelcol.Collector, ctx context.Context) {
        defer a.wg.Done()
        
        a.Logger.Infow("collector starting",
            "configPath", a.otelConfigPath(),
            "deploymentMode", a.Cfg.DeploymentMode,
        )
        
        runErr := col.Run(ctx)
        
        // Clear reference if this is still the active collector
        a.collectorMu.Lock()
        if a.Collector == col {
            a.Collector = nil
        }
        a.collectorMu.Unlock()
        
        if runErr != nil {
            a.Logger.Errorw("collector exited with error", "error", runErr)
        } else {
            a.Logger.Info("collector exited normally")
        }
    }(a.Collector, a.collectorCtx)
    
    return nil
}

Shutdown with Timeout

internal/k8sagent/collector.go
func (a *K8sAgent) stopInternalCollector() {
    a.collectorMu.Lock()
    defer a.collectorMu.Unlock()
    
    if a.Collector == nil {
        a.Logger.Debug("no active collector instance to stop")
        return
    }
    
    a.Logger.Info("stopping collector instance")
    
    // Cancel collector context
    if a.collectorCancel != nil {
        a.collectorCancel()
    }
    
    // Create shutdown timeout
    shutdownCtx, shutdownCancel := context.WithTimeout(
        context.Background(), 10*time.Second)
    defer shutdownCancel()
    
    // Shutdown with timeout monitoring
    done := make(chan struct{})
    go func() {
        a.Collector.Shutdown()
        close(done)
    }()
    
    select {
    case <-done:
        a.Logger.Info("collector instance stopped successfully")
    case <-shutdownCtx.Done():
        a.Logger.Warnw("collector shutdown timed out", 
            "timeout", "10s", 
            "error", shutdownCtx.Err())
    }
    
    a.Collector = nil
    a.collectorCancel = nil
}

Collector Configuration Factory

The collector is configured using a factory that provides all necessary settings:
internal/shared/collector_info.go
func CollectorInfoFactory(cfgPath string) otelcol.CollectorSettings {
    info := component.BuildInfo{
        Command:     "kmagent",
        Description: "KloudMate Agent for OpenTelemetry",
        Version:     version.GetCollectorVersion(),
    }
    
    return otelcol.CollectorSettings{
        BuildInfo:               info,
        Factories:               Components,
        DisableGracefulShutdown: true,
        ConfigProviderSettings: otelcol.ConfigProviderSettings{
            ResolverSettings: confmap.ResolverSettings{
                DefaultScheme: "env",
                URIs:          []string{cfgPath},
                ProviderFactories: []confmap.ProviderFactory{
                    envprovider.NewFactory(),
                    fileprovider.NewFactory(),
                    yamlprovider.NewFactory(),
                },
            },
        },
        SkipSettingGRPCLogger: true,
    }
}
The DisableGracefulShutdown flag is set to true because the agent handles graceful shutdown externally, providing more control over the shutdown sequence.

Thread Safety Mechanisms

The lifecycle implementation uses multiple synchronization primitives:
Used for agent running state with compare-and-swap operations.
type Agent struct {
    isRunning atomic.Bool
    // ...
}

// Atomic state transition
if !a.isRunning.CompareAndSwap(false, true) {
    return fmt.Errorf("agent already running")
}

Race Condition Prevention

Several patterns prevent race conditions:

Collector Reference Protection

internal/agent/agent.go
// Deferred cleanup ensures proper reference management
defer func() {
    a.collectorMu.Lock()
    defer a.collectorMu.Unlock()
    // Only clear if this is the collector we managed
    if a.collector == collector {
        a.collector = nil
        a.logger.Debug("collector instance cleared")
    }
}()

Atomic State Checks

internal/agent/agent.go
// Check running state while holding lock
a.collectorMu.Lock()
if !a.isRunning.Load() {
    // Agent shutdown between check and lock
    a.collectorMu.Unlock()
    a.logger.Info("agent shutdown initiated, aborting collector start")
    return nil
}
a.collector = collector
a.collectorMu.Unlock()

Error Handling and Recovery

The lifecycle management includes comprehensive error handling:
If collector creation fails, the error is returned and the agent continues running.
internal/agent/agent.go
collector, err := NewCollector(a.cfg)
if err != nil {
    return fmt.Errorf("failed to create new collector instance: %w", err)
}
The next configuration check may trigger a retry.
If the collector exits with an error, it’s captured for status reporting.
internal/agent/agent.go
runErr := collector.Run(ctx)
if runErr != nil {
    a.collectorError = runErr.Error()
    a.logger.Errorw("collector run loop exited with error", "error", runErr)
} else {
    a.collectorError = ""
}
Failed configuration updates are logged and reported but don’t crash the agent.
internal/agent/agent.go
if err := a.UpdateConfig(ctx, newConfig); err != nil {
    a.collectorError = err.Error()
    return fmt.Errorf("failed to update config file: %w", err)
}
If shutdown takes too long, it’s logged but the agent proceeds.
internal/agent/agent.go
select {
case <-waitCh:
    a.logger.Info("all agent goroutines completed")
case <-ctx.Done():
    a.logger.Errorf("Agent shutdown timed out: %v", ctx.Err())
    return ctx.Err()
}

Status Reporting

The agent reports detailed status information to the remote API:
internal/updater/updater.go
type UpdateCheckerParams struct {
    Version            string
    AgentStatus        string
    CollectorStatus    string
    CollectorLastError string
}

data := map[string]interface{}{
    "is_docker":          u.cfg.DockerMode,
    "hostname":           u.cfg.Hostname(),
    "platform":           platform,
    "architecture":       runtime.GOARCH,
    "agent_version":      p.Version,
    "collector_version":  version.GetCollectorVersion(),
    "agent_status":       p.AgentStatus,
    "collector_status":   p.CollectorStatus,
    "last_error_message": p.CollectorLastError,
}

Configuration Update Checker

The configuration checker runs periodically with immediate first check:
internal/agent/agent.go
func (a *Agent) runConfigUpdateChecker(ctx context.Context) {
    if a.cfg.ConfigUpdateURL == "" {
        a.logger.Debug("config update URL not configured")
        return
    }
    
    ticker := time.NewTicker(
        time.Duration(a.cfg.ConfigCheckInterval) * time.Second)
    defer ticker.Stop()
    
    // Immediate first check
    if err := a.performConfigCheck(ctx); err != nil {
        a.logger.Errorf("Periodic config check failed: %v", err)
    }
    
    for {
        select {
        case <-ticker.C:
            if err := a.performConfigCheck(ctx); err != nil {
                a.logger.Errorf("Periodic config check failed: %v", err)
            }
        case <-a.shutdownSignal:
            return
        case <-ctx.Done():
            return
        }
    }
}

Best Practices

Atomic Updates

Always use atomic file operations (write to temp, then rename) for configuration updates to prevent partial reads.

Graceful Shutdown

Always provide shutdown timeouts to prevent indefinite hangs during collector shutdown.

Error Capture

Capture and report collector errors for visibility into runtime issues.

State Protection

Use mutexes and atomic operations to protect shared state from race conditions.

Next Steps

Host Agent

Learn about host agent-specific lifecycle management

Kubernetes Agent

Understand Kubernetes agent lifecycle patterns

Configuration

Configure collector behavior and components

Troubleshooting

Resolve lifecycle and startup issues

Build docs developers (and LLMs) love