Watchdog is composed of several specialized components that work together to provide reliable uptime monitoring. This page provides detailed information about each component’s implementation and responsibilities.
Orchestrator
The orchestrator is the main entry point and coordinator for the entire monitoring system.
Location : orchestrator/orchestrator.go
Responsibilities
Initialize the system (logger, event bus, supervisor)
Register event listeners on the event bus
Create and manage parent worker groups for each monitoring interval
Prefill Redis with URL data from the database
Coordinate graceful shutdown
Key methods
// From orchestrator/orchestrator.go:34-58
func NewOrchestrator ( ctx context . Context , rdC * redis . Client , pool * pgxpool . Pool ) * Orchestrator {
newLogger := logger . New ()
newEventBus := core . NewEventBus ( newLogger )
newEventBus . Subscribe ( "ping.successful" , listeners . NewPingSuccessfulListener ( ctx , newLogger , pool ))
newEventBus . Subscribe ( "ping.unsuccessful" , listeners . NewPingUnSuccessfulListener ( ctx , newLogger , pool ))
newSupervisor := supervisor . NewSupervisor (
ctx ,
env . FetchInt ( "SUPERVISOR_POOL_FLUSH_BATCHSIZE" , 100 ),
time . Duration ( env . FetchInt ( "SUPERVISOR_POOL_FLUSH_TIMEOUT" , 5 )) * time . Second ,
newEventBus ,
pool ,
)
return & Orchestrator {
intervals : make ( map [ int ] * worker . ParentWorker ),
ctx : ctx ,
RedisClient : rdC ,
Supervisor : newSupervisor ,
DB : pool ,
UrlRepository : database . NewUrlRepository ( pool ),
Logger : newLogger ,
EventBus : & newEventBus ,
}
}
The orchestrator uses a mutex-protected map (intervals) to safely manage worker groups across goroutines.
Configuration
The orchestrator reads these environment variables:
SUPERVISOR_POOL_FLUSH_BATCHSIZE: Number of tasks to batch before flushing (default: 100)
SUPERVISOR_POOL_FLUSH_TIMEOUT: Seconds to wait before flushing incomplete batches (default: 5)
Parent worker
Each parent worker manages a pool of child workers for a specific monitoring interval.
Location : worker/parent_worker.go
Responsibilities
Spawn and manage a pool of child workers
Receive tick signals from the orchestrator
Fetch URL IDs from Redis for the assigned interval
Divide work into chunks and distribute to child workers
Forward work chunks through the work pool channel
Implementation details
// From worker/parent_worker.go:42-69
func ( pw * ParentWorker ) Work () {
urlLength , err := pw . RedisClient . LLen ( pw . Ctx , core . FormatRedisList ( pw . Interval )). Result ()
if err != nil {
log . Println ( err )
return
}
if urlLength < 1 {
return
}
// Fetch all the ids in Redis
urlIds , err := pw . RedisClient . LRange ( pw . Ctx , core . FormatRedisList ( pw . Interval ), 0 , urlLength ). Result ()
if err != nil {
log . Println ( err )
return
}
maxPoolSize := env . FetchInt ( "MAXIMUM_WORK_POOL_SIZE" )
for i := 0 ; i < len ( urlIds ); i += maxPoolSize {
end := i + maxPoolSize
if end > len ( urlIds ) {
end = len ( urlIds )
}
chunk := urlIds [ i : end ]
pw . WorkPool <- chunk
}
}
The parent worker chunks URL IDs based on MAXIMUM_WORK_POOL_SIZE to prevent overwhelming the work pool channel.
Configuration
MAXIMUM_CHILD_WORKERS: Number of child workers to spawn (must be > 0)
MAXIMUM_WORK_POOL_SIZE: Maximum size of work chunks distributed to children
Child worker
Child workers perform the actual HTTP checks and report results to the supervisor.
Location : worker/child_worker.go
Responsibilities
Listen for work chunks from the parent worker’s work pool
Fetch full URL details from Redis hash storage
Perform HTTP requests with configured method and timeout
Evaluate HTTP response status codes
Submit check results to the supervisor as tasks
HTTP check logic
// From worker/child_worker.go:51-94
func ( cw * ChildWorker ) Work ( urlId string ) {
client := & http . Client {
Timeout : time . Duration ( env . FetchInt ( "HTTP_REQUEST_TIMEOUT" , 5 )) * time . Second ,
}
var url database . Url
val , err := cw . ParentWorker . RedisClient . HGet ( cw . Ctx , core . FormatRedisHash ( cw . ParentWorker . Interval ), urlId ). Bytes ()
if err != nil {
fmt . Println ( err )
return
}
err = url . UnmarshalBinary ( val )
if err != nil {
return
}
request , err := http . NewRequest ( url . HttpMethod . ToMethod (), url . Url , nil )
if err != nil {
fmt . Println ( err )
return
}
resp , err := client . Do ( request )
if err != nil {
// Network error - mark as unhealthy
task := supervisor . Task {
Healthy : false ,
Url : url . Url ,
UrlId : url . Id ,
}
cw . ParentWorker . Supervisor . WorkPool <- task
return
}
defer resp . Body . Close ()
// Check status code - 2xx is healthy
task := supervisor . Task {
UrlId : url . Id ,
Healthy : resp . StatusCode > 199 && resp . StatusCode < 300 ,
Url : url . Url ,
}
cw . ParentWorker . Supervisor . WorkPool <- task
}
A URL is considered healthy if the HTTP status code is in the 2xx range (200-299). Network errors or other status codes mark the URL as unhealthy.
Configuration
HTTP_REQUEST_TIMEOUT: Timeout in seconds for HTTP requests (default: 5)
Supervisor
The supervisor receives check results from workers, applies decision logic, and publishes domain events.
Location : supervisor/supervisor.go
Responsibilities
Receive check results through a buffered work pool channel
Batch tasks for efficient processing
Apply decision logic to determine success or failure
Publish ping.successful or ping.unsuccessful events to the event bus
Implement timeout-based flushing for incomplete batches
Batching logic
// From supervisor/supervisor.go:33-56
func ( s * Supervisor ) Activate () {
buffer := make ([] Task , 0 , s . BatchSize )
ticker := time . NewTicker ( s . Timeout )
go func () {
for {
select {
case <- s . ctx . Done ():
return
case task := <- s . WorkPool :
buffer = append ( buffer , task )
if len ( buffer ) >= s . BatchSize {
s . flush ( buffer )
buffer = buffer [: 0 ]
}
case <- ticker . C :
if len ( buffer ) > 0 {
s . flush ( buffer )
buffer = buffer [: 0 ]
}
}
}
}()
}
The supervisor uses two flush triggers:
Batch size : Flushes when buffer reaches SUPERVISOR_POOL_FLUSH_BATCHSIZE
Timeout : Flushes incomplete batches after SUPERVISOR_POOL_FLUSH_TIMEOUT seconds
Event publishing
// From supervisor/supervisor.go:58-75
func ( s * Supervisor ) flush ( buffer [] Task ) {
for _ , task := range buffer {
if task . Healthy {
s . EventBus . Dispatch ( & events . PingSuccessful {
UrlId : task . UrlId ,
Healthy : task . Healthy ,
Url : task . Url ,
})
} else {
s . EventBus . Dispatch ( & events . PingUnSuccessful {
UrlId : task . UrlId ,
Healthy : task . Healthy ,
Url : task . Url ,
})
}
}
}
Event bus
The event bus provides a lightweight pub/sub mechanism for decoupling event producers from consumers.
Location : core/event_bus.go
Responsibilities
Manage subscriptions to event topics
Dispatch events to registered handlers
Execute handlers asynchronously in separate goroutines
Provide thread-safe subscription management
Implementation
// From core/event_bus.go:32-46
func ( bus * EventBusImpl ) Subscribe ( eventName string , handler EventHandler ) {
bus . rwMutex . Lock ()
defer bus . rwMutex . Unlock ()
bus . handlers [ eventName ] = append ( bus . handlers [ eventName ], handler )
bus . Logger (). Info ( fmt . Sprintf ( "Subscribing to %s event" , eventName ))
}
func ( bus * EventBusImpl ) Dispatch ( event Event ) {
bus . rwMutex . RLock ()
defer bus . rwMutex . RUnlock ()
for _ , handler := range bus . handlers [ event . Name ()] {
bus . Logger (). Info ( fmt . Sprintf ( "Dispatching %s event to listeners" , event . Name ()))
go handler . Handle ( event )
}
}
Event handlers run asynchronously in separate goroutines, allowing the supervisor to continue processing without waiting for side effects to complete.
Event listeners
Listeners subscribe to events and handle side effects such as persistence and notifications.
Ping successful listener
Location : events/listeners/ping_successful_listener.go
Handles successful ping events:
State transition detection : Checks if the URL was previously unhealthy
Incident resolution : Marks incidents as resolved in the database
Recovery notification : Sends “Site is UP” email to the contact address
Metrics persistence : Stores the successful check in the url_statuses hypertable
Status update : Updates the URL’s current status to healthy in the urls table
// From events/listeners/ping_successful_listener.go:31-47
if url . Status == enums . UnHealthy {
incidentRepo := database . NewIncidentRepository ( sl . DB )
err := incidentRepo . Resolve ( sl . ctx , url . Id )
if err != nil {
sl . logger . Error ( "Unable to log incident as resolved: " , err . Error (), url )
}
err = core . SendEmail ( core . SendEmailConfig {
Recipients : [] string { url . ContactEmail },
Subject : "Your Site is now UP" ,
Content : fmt . Sprintf ( "Your Site ` %v ` is UP. It went up at %v . Good work" , url . Url , time . Now ()),
ContentType : "text/plain" ,
})
if err != nil {
sl . logger . Error ( "Error sending monitoring alert email: " , err , e )
}
}
Ping unsuccessful listener
Location : events/listeners/ping_unsuccessful_listener.go
Handles unsuccessful ping events:
State transition detection : Checks if the URL was previously healthy
Incident creation : Logs a new incident in the database
Downtime notification : Sends “Site is DOWN” email to the contact address
Status update : Updates the URL’s current status to unhealthy in the urls table
Metrics persistence : Stores the failed check in the url_statuses hypertable
// From events/listeners/ping_unsuccessful_listener.go:33-49
if url . Status == enums . Healthy {
incidentRepo := database . NewIncidentRepository ( sl . DB )
err := incidentRepo . Add ( sl . ctx , url . Id )
if err != nil {
sl . logger . Error ( "Unable to log incident: " , err . Error (), url )
}
err = core . SendEmail ( core . SendEmailConfig {
Recipients : [] string { url . ContactEmail },
Subject : "Your Site is DOWN" ,
Content : fmt . Sprintf ( "Your Site ` %v ` is DOWN. It went down at %v \n . Please check it out" , url . Url , time . Now ()),
ContentType : "text/plain" ,
})
if err != nil {
sl . logger . Error ( "Error sending monitoring alert email: " , err , e )
}
}
Notifications are only sent on state transitions (healthy → unhealthy or unhealthy → healthy), not on every check. This prevents notification spam.
Database repositories
Repositories encapsulate SQL operations for data persistence.
URL repository
Location : database/url_repository.go
Provides methods for:
Add: Insert new monitored URLs
FindById: Fetch URL by ID
FetchAll: List URLs with filtering and pagination
UpdateStatus: Update URL health status
Remove: Delete a URL
URL status repository
Location : database/url_status_repository.go
Provides methods for time-series data:
Add: Insert status check results into the hypertable
GetRecentStatus: Fetch most recent status matching a condition
GetLastStatus: Fetch the most recent status regardless of value
// From database/url_status_repository.go:17-24
func ( ur urlStatusRepository ) Add ( ctx context . Context , urlId int , status bool ) error {
sql := "INSERT INTO url_statuses (time, url_id,status) VALUES (NOW(), $1,$2)"
_ , err := ur . pool . Exec ( ctx , sql , urlId , status )
if err != nil {
return err
}
return nil
}
Next steps
Architecture overview Understand the high-level system design
Event flow Follow the complete event-driven workflow