Overview
The Host Agent runs as a system service on Linux or Windows hosts, managing the lifecycle of an OpenTelemetry Collector instance. It provides automated configuration management, remote updates, and health monitoring without requiring direct SSH access to the host.
Architecture Components
The host agent architecture consists of three main layers:
Service Layer
The agent runs as a native system service using the kardianos/service library, which provides cross-platform service management:
systemctl status kmagent
systemctl start kmagent
systemctl stop kmagent
The service is registered as kmagent.service and runs with appropriate user permissions. sc query kmagent
net start kmagent
net stop kmagent
Runs as a Windows Service with configurable credentials.
svcConfig := & service . Config {
Name : "kmagent" ,
DisplayName : "KloudMate Agent" ,
Description : "KloudMate Agent for OpenTelemetry auto instrumentation" ,
}
svc , err := service . New ( p , svcConfig )
Agent Layer
The agent layer manages the collector lifecycle and configuration updates:
Agent Initialization
Agent Startup
func New ( cfg * config . Config , logger * zap . SugaredLogger , opts ... Option ) ( * Agent , error ) {
configUpdater := updater . NewConfigUpdater ( cfg , logger )
a := Agent {
cfg : cfg ,
logger : logger ,
updater : configUpdater ,
shutdownSignal : make ( chan struct {}),
}
return & a , nil
}
Collector Layer
The OpenTelemetry Collector is created using the factory pattern with platform-specific components:
internal/agent/collector.go
func NewCollector ( c * config . Config ) ( * otelcol . Collector , error ) {
collectorSettings := shared . CollectorInfoFactory ( c . OtelConfigPath )
return otelcol . NewCollector ( collectorSettings )
}
internal/shared/collector_info.go
func CollectorInfoFactory ( cfgPath string ) otelcol . CollectorSettings {
info := component . BuildInfo {
Command : "kmagent" ,
Description : "KloudMate Agent for OpenTelemetry" ,
Version : version . GetCollectorVersion (),
}
return otelcol . CollectorSettings {
BuildInfo : info ,
Factories : Components ,
DisableGracefulShutdown : true ,
ConfigProviderSettings : otelcol . ConfigProviderSettings {
ResolverSettings : confmap . ResolverSettings {
DefaultScheme : "env" ,
URIs : [] string { cfgPath },
ProviderFactories : [] confmap . ProviderFactory {
envprovider . NewFactory (),
fileprovider . NewFactory (),
yamlprovider . NewFactory (),
},
},
},
SkipSettingGRPCLogger : true ,
}
}
Lifecycle Management
The agent manages the collector through distinct lifecycle phases:
Initialization
The Program structure is initialized with configuration from CLI flags, environment variables, and config files. func ( p * Program ) Initialize ( c * cli . Context ) error {
p . ctx , p . cancelFunc = context . WithCancel ( context . Background ())
// Load configuration
err = p . cfg . LoadConfig ()
if err != nil {
return fmt . Errorf ( "failed to load configuration: %v " , err )
}
// Create agent
p . kmAgent , err = agent . New ( p . cfg , p . logger , agent . WithVersion ( p . version ))
if err != nil {
return fmt . Errorf ( "failed to create agent: %v " , err )
}
return nil
}
Service Start
The service layer calls the Start method, which launches two goroutines:
Collector lifecycle manager
Configuration update checker
func ( p * Program ) Start ( s service . Service ) error {
p . logger . Info ( "Starting service" )
p . wg . Add ( 1 )
go p . run ()
return nil
}
Collector Creation
The agent creates a new collector instance with the current configuration. func ( a * Agent ) manageCollectorLifecycle ( ctx context . Context ) error {
collector , err := NewCollector ( a . cfg )
if err != nil {
return fmt . Errorf ( "failed to create new collector instance: %w " , err )
}
a . collectorMu . Lock ()
if ! a . isRunning . Load () {
a . collectorMu . Unlock ()
return nil
}
a . collector = collector
a . collectorMu . Unlock ()
runErr := collector . Run ( ctx )
return runErr
}
Configuration Watching
A ticker periodically checks for configuration updates from the remote API. func ( a * Agent ) runConfigUpdateChecker ( ctx context . Context ) {
ticker := time . NewTicker ( time . Duration ( a . cfg . ConfigCheckInterval ) * time . Second )
defer ticker . Stop ()
// Trigger the very first config check
if err := a . performConfigCheck ( ctx ); err != nil {
a . logger . Errorf ( "Periodic config check failed: %v " , err )
}
for {
select {
case <- ticker . C :
if err := a . performConfigCheck ( ctx ); err != nil {
a . logger . Errorf ( "Periodic config check failed: %v " , err )
}
case <- a . shutdownSignal :
return
case <- ctx . Done ():
return
}
}
}
Graceful Shutdown
When the service stops, the agent gracefully shuts down the collector and waits for all goroutines to complete. func ( p * Program ) Stop ( s service . Service ) error {
if p . cancelFunc != nil {
p . cancelFunc ()
}
if p . kmAgent != nil {
shutdownCtx , shutdownCancel := context . WithTimeout (
context . Background (), 30 * time . Second )
defer shutdownCancel ()
if err := p . kmAgent . Shutdown ( shutdownCtx ); err != nil {
p . logger . Errorf ( "Error during agent shutdown: %v " , err )
}
}
p . wg . Wait ()
return nil
}
Configuration Update Flow
The agent implements a sophisticated configuration update mechanism:
Status Collection
The agent collects current status information about itself and the collector. a . collectorMu . Lock ()
params := updater . UpdateCheckerParams {
Version : a . version ,
}
if a . collector != nil {
params . CollectorStatus = "Running"
} else {
params . CollectorStatus = "Stopped"
params . CollectorLastError = a . collectorError
}
a . collectorMu . Unlock ()
if a . isRunning . Load () {
params . AgentStatus = "Running"
} else {
params . AgentStatus = "Stopped"
}
Remote API Call
The updater sends a POST request to the remote API with status information. internal/updater/updater.go
data := map [ string ] interface {}{
"is_docker" : u . cfg . DockerMode ,
"hostname" : u . cfg . Hostname (),
"platform" : platform ,
"architecture" : runtime . GOARCH ,
"agent_version" : p . Version ,
"collector_version" : version . GetCollectorVersion (),
"agent_status" : p . AgentStatus ,
"collector_status" : p . CollectorStatus ,
"last_error_message" : p . CollectorLastError ,
}
req , err := http . NewRequestWithContext (
reqCtx , "POST" , u . cfg . ConfigUpdateURL , bytes . NewBuffer ( jsonData ))
req . Header . Set ( "Authorization" , u . cfg . APIKey )
Response Processing
The API returns whether a restart is required and the new configuration. internal/updater/updater.go
type ConfigUpdateResponse struct {
RestartRequired bool `json:"restart_required"`
Config map [ string ] interface {} `json:"config"`
}
var updateResp ConfigUpdateResponse
if err := json . NewDecoder ( resp . Body ). Decode ( & updateResp ); err != nil {
return false , nil , fmt . Errorf ( "failed to decode: %w " , err )
}
return updateResp . RestartRequired , updateResp . Config , nil
Atomic Update
If a restart is required, the agent writes the new configuration atomically. func ( a * Agent ) UpdateConfig ( _ context . Context , newConfig map [ string ] interface {}) error {
configYAML , err := yaml . Marshal ( newConfig )
if err != nil {
return fmt . Errorf ( "failed to marshal new config: %w " , err )
}
tempFile := a . cfg . OtelConfigPath + ".new"
if err := os . WriteFile ( tempFile , configYAML , 0644 ); err != nil {
return fmt . Errorf ( "failed to write new config: %w " , err )
}
if err := os . Rename ( tempFile , a . cfg . OtelConfigPath ); err != nil {
return fmt . Errorf ( "failed to replace config file: %w " , err )
}
return nil
}
Collector Restart
The agent stops the current collector and starts a new instance with the updated configuration. if newConfig != nil && restart {
if err := a . UpdateConfig ( ctx , newConfig ); err != nil {
return fmt . Errorf ( "failed to update config file: %w " , err )
}
a . stopCollectorInstance ()
a . wg . Add ( 1 )
go func () {
defer a . wg . Done ()
if err := a . manageCollectorLifecycle ( agentCtx ); err != nil {
a . collectorError = err . Error ()
} else {
a . collectorError = ""
}
}()
}
Thread Safety
The agent uses multiple synchronization primitives to ensure thread-safe operation:
Atomic Boolean
Mutex Protection
WaitGroup Coordination
// Agent running state
isRunning atomic . Bool
// Thread-safe compare-and-swap
if ! a . isRunning . CompareAndSwap ( false , true ) {
return fmt . Errorf ( "agent already running" )
}
Docker Mode
When running in Docker mode, the agent enables specialized configuration:
internal/config/config.go
type Config struct {
DockerMode bool
DockerEndpoint string
// ... other fields
}
func GetDockerConfigPath () string {
return "/etc/kmagent/config.yaml"
}
The Docker agent uses volume mounts to access host resources:
docker run -d \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /var/log:/var/log:ro \
-v /proc:/host/proc:ro \
-v /sys:/host/sys:ro \
-e KM_API_KEY="your-api-key" \
-e KM_DOCKER_MODE= true \
kloudmate/km-agent
Configuration Paths
The agent uses platform-specific configuration paths:
internal/config/config.go
func GetDefaultConfigPath () string {
if runtime . GOOS == "windows" {
execPath , _ := os . Executable ()
return filepath . Join ( filepath . Dir ( execPath ), "config.yaml" )
} else if runtime . GOOS == "darwin" {
return "/Library/Application Support/kmagent/config.yaml"
} else {
// Linux/Unix
return "/etc/kmagent/config.yaml"
}
}
The configuration directory is automatically created if it doesn’t exist, with appropriate permissions (0755).
Logging
The host agent uses structured logging with rotation:
func getFileLogger ( logFile string ) ( * zap . SugaredLogger , error ) {
writer := zapcore . AddSync ( & lumberjack . Logger {
Filename : logFile ,
MaxSize : 10 , // MB
MaxBackups : 1 ,
MaxAge : 7 , // days
Compress : true ,
})
core := zapcore . NewTee (
zapcore . NewCore ( encoder , writer , zapcore . InfoLevel ),
zapcore . NewCore ( zapcore . NewConsoleEncoder ( encoderConfig ),
zapcore . AddSync ( os . Stdout ), zapcore . DebugLevel ),
)
logger := zap . New ( core )
return logger . Sugar (), nil
}
Error Handling
The agent implements comprehensive error handling:
Collector Startup Failures
If the collector fails to start, the error is captured and reported to the remote API on the next status check. runErr := collector . Run ( ctx )
if runErr != nil {
a . collectorError = runErr . Error ()
a . logger . Errorw ( "collector run loop exited with error" , "error" , runErr )
}
Configuration Update Failures
Failed configuration updates are logged but don’t crash the agent. The collector continues with the previous configuration. if err := a . UpdateConfig ( ctx , newConfig ); err != nil {
a . collectorError = err . Error ()
return fmt . Errorf ( "failed to update config file: %w " , err )
}
Network errors during configuration checks are logged but don’t prevent future checks. internal/updater/updater.go
if respErr != nil {
return false , nil , fmt . Errorf ( "failed to fetch config updates: %w " , respErr )
}
Next Steps
Collector Lifecycle Deep dive into collector lifecycle management and restart mechanisms
Kubernetes Agent Learn about the Kubernetes deployment architecture
Configuration Configure the host agent for your environment
Deployment Deploy the host agent on Linux systems