Overview
The KloudMate Agent manages the OpenTelemetry Collector as a child process with sophisticated lifecycle orchestration. This architecture enables zero-downtime configuration updates, graceful restarts, and comprehensive error handling across all deployment modes.
Lifecycle States
The collector progresses through distinct states during its lifecycle:
Not Created
Initial state before the collector instance is created. // Agent initialized, collector not yet created
a . collector == nil
a . isRunning . Load () == false
Creating
The collector instance is being instantiated with configuration. internal/agent/collector.go
func NewCollector ( c * config . Config ) ( * otelcol . Collector , error ) {
collectorSettings := shared . CollectorInfoFactory ( c . OtelConfigPath )
return otelcol . NewCollector ( collectorSettings )
}
Running
The collector is actively processing telemetry data. a . collectorMu . Lock ()
a . collector = collector
a . collectorMu . Unlock ()
runErr := collector . Run ( ctx )
Stopping
The collector is being gracefully shut down. func ( a * Agent ) stopCollectorInstance () {
a . collectorMu . Lock ()
collector := a . collector
a . collector = nil
a . collectorMu . Unlock ()
if collector != nil {
collector . Shutdown ()
}
}
Stopped
The collector has exited and resources are cleaned up. // Collector reference cleared
a . collector == nil
// Error state captured if applicable
if runErr != nil {
a . collectorError = runErr . Error ()
}
Host Agent Lifecycle
The host agent implements a concurrent lifecycle management pattern:
Initial Startup
Agent Start
Collector Creation
func ( a * Agent ) StartAgent ( ctx context . Context ) error {
// Atomic state transition
if ! a . isRunning . CompareAndSwap ( false , true ) {
return fmt . Errorf ( "agent already running" )
}
setupComplete := false
defer func () {
if ! setupComplete {
a . isRunning . Store ( false )
a . logger . Warn ( "Agent startup failed, reset running state" )
}
}()
// Launch two goroutines
a . wg . Add ( 2 )
// Collector lifecycle manager
go func () {
defer a . wg . Done ()
if err := a . manageCollectorLifecycle ( ctx ); err != nil {
a . logger . Errorf ( "Initial collector run failed: %v " , err )
}
}()
// Configuration update checker
go func () {
defer a . wg . Done ()
a . runConfigUpdateChecker ( ctx )
}()
setupComplete = true
return nil
}
Configuration-Triggered Restart
When a configuration change is detected, the agent performs an atomic restart:
Detect Change
The configuration updater detects a change via the remote API. restart , newConfig , err := a . updater . CheckForUpdates ( ctx , params )
if err != nil {
return fmt . Errorf ( "updater.CheckForUpdates failed: %w " , err )
}
Update Configuration File
The new configuration is written atomically using a temporary file. func ( a * Agent ) UpdateConfig ( _ context . Context ,
newConfig map [ string ] interface {}) error {
configYAML , err := yaml . Marshal ( newConfig )
if err != nil {
return fmt . Errorf ( "failed to marshal new config to YAML: %w " , err )
}
// Write to temporary file
tempFile := a . cfg . OtelConfigPath + ".new"
if err := os . WriteFile ( tempFile , configYAML , 0644 ); err != nil {
return fmt . Errorf ( "failed to write new config: %w " , err )
}
// Atomic rename
if err := os . Rename ( tempFile , a . cfg . OtelConfigPath ); err != nil {
return fmt . Errorf ( "failed to replace config file: %w " , err )
}
return nil
}
Stop Current Collector
The running collector is gracefully stopped. func ( a * Agent ) stopCollectorInstance () {
a . collectorMu . Lock ()
collector := a . collector
a . collector = nil
a . collectorMu . Unlock ()
if collector != nil {
a . logger . Info ( "shutting down active collector instance" )
collector . Shutdown ()
a . logger . Info ( "collector shutdown complete" )
}
}
Start New Collector
A new collector instance is started with the updated configuration. if newConfig != nil && restart {
if err := a . UpdateConfig ( ctx , newConfig ); err != nil {
a . collectorError = err . Error ()
return fmt . Errorf ( "failed to update config file: %w " , err )
}
a . stopCollectorInstance ()
a . wg . Add ( 1 )
go func () {
defer a . wg . Done ()
if err := a . manageCollectorLifecycle ( agentCtx ); err != nil {
a . collectorError = err . Error ()
} else {
a . logger . Info ( "collector restarted successfully" )
a . collectorError = ""
}
}()
}
Graceful Shutdown
The agent implements a multi-phase graceful shutdown:
func ( a * Agent ) Shutdown ( ctx context . Context ) error {
// Atomic state transition
if ! a . isRunning . CompareAndSwap ( true , false ) {
a . logger . Debug ( "shutdown called but agent is not running" )
return nil
}
// Signal all goroutines to stop
close ( a . shutdownSignal )
// Stop collector
a . logger . Info ( "stopping collector instance" )
a . stopCollectorInstance ()
// Wait for all goroutines with timeout
waitCh := make ( chan struct {})
go func () {
a . wg . Wait ()
close ( waitCh )
}()
select {
case <- waitCh :
a . logger . Info ( "all agent goroutines completed" )
case <- ctx . Done ():
a . logger . Errorf ( "Agent shutdown timed out: %v " , ctx . Err ())
return ctx . Err ()
}
return nil
}
Kubernetes Agent Lifecycle
The Kubernetes agent has a simplified lifecycle without remote configuration updates:
Startup
internal/k8sagent/collector.go
func ( a * K8sAgent ) startInternalCollector () error {
a . collectorMu . Lock ()
defer a . collectorMu . Unlock ()
a . Logger . Info ( "starting collector instance" )
// Get collector settings
collectorSettings := shared . CollectorInfoFactory ( a . otelConfigPath ())
// Filter components for deployment mode
if a . Cfg . DeploymentMode == "DEPLOYMENT" {
factories , err := collectorSettings . Factories ()
if err == nil {
// Remove eBPF receiver (requires host access)
for typeName := range factories . Receivers {
if typeName . String () == "ebpfreceiver" {
delete ( factories . Receivers , typeName )
}
}
collectorSettings . Factories = func () ( otelcol . Factories , error ) {
return factories , nil
}
}
}
// Create collector context
a . collectorCtx , a . collectorCancel = context . WithCancel ( context . Background ())
// Create collector
collector , err := otelcol . NewCollector ( collectorSettings )
if err != nil {
a . collectorCancel ()
return fmt . Errorf ( "failed to create new collector: %w " , err )
}
a . Collector = collector
// Start collector in goroutine
a . wg . Add ( 1 )
go func ( col * otelcol . Collector , ctx context . Context ) {
defer a . wg . Done ()
a . Logger . Infow ( "collector starting" ,
"configPath" , a . otelConfigPath (),
"deploymentMode" , a . Cfg . DeploymentMode ,
)
runErr := col . Run ( ctx )
// Clear reference if this is still the active collector
a . collectorMu . Lock ()
if a . Collector == col {
a . Collector = nil
}
a . collectorMu . Unlock ()
if runErr != nil {
a . Logger . Errorw ( "collector exited with error" , "error" , runErr )
} else {
a . Logger . Info ( "collector exited normally" )
}
}( a . Collector , a . collectorCtx )
return nil
}
Shutdown with Timeout
internal/k8sagent/collector.go
func ( a * K8sAgent ) stopInternalCollector () {
a . collectorMu . Lock ()
defer a . collectorMu . Unlock ()
if a . Collector == nil {
a . Logger . Debug ( "no active collector instance to stop" )
return
}
a . Logger . Info ( "stopping collector instance" )
// Cancel collector context
if a . collectorCancel != nil {
a . collectorCancel ()
}
// Create shutdown timeout
shutdownCtx , shutdownCancel := context . WithTimeout (
context . Background (), 10 * time . Second )
defer shutdownCancel ()
// Shutdown with timeout monitoring
done := make ( chan struct {})
go func () {
a . Collector . Shutdown ()
close ( done )
}()
select {
case <- done :
a . Logger . Info ( "collector instance stopped successfully" )
case <- shutdownCtx . Done ():
a . Logger . Warnw ( "collector shutdown timed out" ,
"timeout" , "10s" ,
"error" , shutdownCtx . Err ())
}
a . Collector = nil
a . collectorCancel = nil
}
Collector Configuration Factory
The collector is configured using a factory that provides all necessary settings:
internal/shared/collector_info.go
func CollectorInfoFactory ( cfgPath string ) otelcol . CollectorSettings {
info := component . BuildInfo {
Command : "kmagent" ,
Description : "KloudMate Agent for OpenTelemetry" ,
Version : version . GetCollectorVersion (),
}
return otelcol . CollectorSettings {
BuildInfo : info ,
Factories : Components ,
DisableGracefulShutdown : true ,
ConfigProviderSettings : otelcol . ConfigProviderSettings {
ResolverSettings : confmap . ResolverSettings {
DefaultScheme : "env" ,
URIs : [] string { cfgPath },
ProviderFactories : [] confmap . ProviderFactory {
envprovider . NewFactory (),
fileprovider . NewFactory (),
yamlprovider . NewFactory (),
},
},
},
SkipSettingGRPCLogger : true ,
}
}
The DisableGracefulShutdown flag is set to true because the agent handles graceful shutdown externally, providing more control over the shutdown sequence.
Thread Safety Mechanisms
The lifecycle implementation uses multiple synchronization primitives:
Atomic Bool
Mutex
WaitGroup
Channels
Used for agent running state with compare-and-swap operations. type Agent struct {
isRunning atomic . Bool
// ...
}
// Atomic state transition
if ! a . isRunning . CompareAndSwap ( false , true ) {
return fmt . Errorf ( "agent already running" )
}
Protects collector reference during concurrent access. type Agent struct {
collectorMu sync . Mutex
collector * otelcol . Collector
// ...
}
a . collectorMu . Lock ()
collector := a . collector
a . collector = nil
a . collectorMu . Unlock ()
Tracks running goroutines for coordinated shutdown. type Agent struct {
wg sync . WaitGroup
// ...
}
a . wg . Add ( 2 )
go func () {
defer a . wg . Done ()
// ... work
}()
// Wait for all goroutines
a . wg . Wait ()
Signal shutdown and coordinate goroutines. type Agent struct {
shutdownSignal chan struct {}
// ...
}
// Signal shutdown
close ( a . shutdownSignal )
// Wait for signal
select {
case <- a . shutdownSignal :
return
case <- ticker . C :
// ... work
}
Race Condition Prevention
Several patterns prevent race conditions:
Collector Reference Protection
// Deferred cleanup ensures proper reference management
defer func () {
a . collectorMu . Lock ()
defer a . collectorMu . Unlock ()
// Only clear if this is the collector we managed
if a . collector == collector {
a . collector = nil
a . logger . Debug ( "collector instance cleared" )
}
}()
Atomic State Checks
// Check running state while holding lock
a . collectorMu . Lock ()
if ! a . isRunning . Load () {
// Agent shutdown between check and lock
a . collectorMu . Unlock ()
a . logger . Info ( "agent shutdown initiated, aborting collector start" )
return nil
}
a . collector = collector
a . collectorMu . Unlock ()
Error Handling and Recovery
The lifecycle management includes comprehensive error handling:
Collector Creation Failure
If collector creation fails, the error is returned and the agent continues running. collector , err := NewCollector ( a . cfg )
if err != nil {
return fmt . Errorf ( "failed to create new collector instance: %w " , err )
}
The next configuration check may trigger a retry.
Collector Runtime Failure
If the collector exits with an error, it’s captured for status reporting. runErr := collector . Run ( ctx )
if runErr != nil {
a . collectorError = runErr . Error ()
a . logger . Errorw ( "collector run loop exited with error" , "error" , runErr )
} else {
a . collectorError = ""
}
Configuration Update Failure
Failed configuration updates are logged and reported but don’t crash the agent. if err := a . UpdateConfig ( ctx , newConfig ); err != nil {
a . collectorError = err . Error ()
return fmt . Errorf ( "failed to update config file: %w " , err )
}
If shutdown takes too long, it’s logged but the agent proceeds. select {
case <- waitCh :
a . logger . Info ( "all agent goroutines completed" )
case <- ctx . Done ():
a . logger . Errorf ( "Agent shutdown timed out: %v " , ctx . Err ())
return ctx . Err ()
}
Status Reporting
The agent reports detailed status information to the remote API:
internal/updater/updater.go
type UpdateCheckerParams struct {
Version string
AgentStatus string
CollectorStatus string
CollectorLastError string
}
data := map [ string ] interface {}{
"is_docker" : u . cfg . DockerMode ,
"hostname" : u . cfg . Hostname (),
"platform" : platform ,
"architecture" : runtime . GOARCH ,
"agent_version" : p . Version ,
"collector_version" : version . GetCollectorVersion (),
"agent_status" : p . AgentStatus ,
"collector_status" : p . CollectorStatus ,
"last_error_message" : p . CollectorLastError ,
}
Configuration Update Checker
The configuration checker runs periodically with immediate first check:
func ( a * Agent ) runConfigUpdateChecker ( ctx context . Context ) {
if a . cfg . ConfigUpdateURL == "" {
a . logger . Debug ( "config update URL not configured" )
return
}
ticker := time . NewTicker (
time . Duration ( a . cfg . ConfigCheckInterval ) * time . Second )
defer ticker . Stop ()
// Immediate first check
if err := a . performConfigCheck ( ctx ); err != nil {
a . logger . Errorf ( "Periodic config check failed: %v " , err )
}
for {
select {
case <- ticker . C :
if err := a . performConfigCheck ( ctx ); err != nil {
a . logger . Errorf ( "Periodic config check failed: %v " , err )
}
case <- a . shutdownSignal :
return
case <- ctx . Done ():
return
}
}
}
Best Practices
Atomic Updates Always use atomic file operations (write to temp, then rename) for configuration updates to prevent partial reads.
Graceful Shutdown Always provide shutdown timeouts to prevent indefinite hangs during collector shutdown.
Error Capture Capture and report collector errors for visibility into runtime issues.
State Protection Use mutexes and atomic operations to protect shared state from race conditions.
Next Steps
Host Agent Learn about host agent-specific lifecycle management
Kubernetes Agent Understand Kubernetes agent lifecycle patterns
Configuration Configure collector behavior and components
Troubleshooting Resolve lifecycle and startup issues