Skip to main content
Watchdog is composed of several specialized components that work together to provide reliable uptime monitoring. This page provides detailed information about each component’s implementation and responsibilities.

Orchestrator

The orchestrator is the main entry point and coordinator for the entire monitoring system. Location: orchestrator/orchestrator.go

Responsibilities

  • Initialize the system (logger, event bus, supervisor)
  • Register event listeners on the event bus
  • Create and manage parent worker groups for each monitoring interval
  • Prefill Redis with URL data from the database
  • Coordinate graceful shutdown

Key methods

// From orchestrator/orchestrator.go:34-58
func NewOrchestrator(ctx context.Context, rdC *redis.Client, pool *pgxpool.Pool) *Orchestrator {
    newLogger := logger.New()
    newEventBus := core.NewEventBus(newLogger)
    newEventBus.Subscribe("ping.successful", listeners.NewPingSuccessfulListener(ctx, newLogger, pool))
    newEventBus.Subscribe("ping.unsuccessful", listeners.NewPingUnSuccessfulListener(ctx, newLogger, pool))

    newSupervisor := supervisor.NewSupervisor(
        ctx,
        env.FetchInt("SUPERVISOR_POOL_FLUSH_BATCHSIZE", 100),
        time.Duration(env.FetchInt("SUPERVISOR_POOL_FLUSH_TIMEOUT", 5))*time.Second,
        newEventBus,
        pool,
    )

    return &Orchestrator{
        intervals:     make(map[int]*worker.ParentWorker),
        ctx:           ctx,
        RedisClient:   rdC,
        Supervisor:    newSupervisor,
        DB:            pool,
        UrlRepository: database.NewUrlRepository(pool),
        Logger:        newLogger,
        EventBus:      &newEventBus,
    }
}
The orchestrator uses a mutex-protected map (intervals) to safely manage worker groups across goroutines.

Configuration

The orchestrator reads these environment variables:
  • SUPERVISOR_POOL_FLUSH_BATCHSIZE: Number of tasks to batch before flushing (default: 100)
  • SUPERVISOR_POOL_FLUSH_TIMEOUT: Seconds to wait before flushing incomplete batches (default: 5)

Parent worker

Each parent worker manages a pool of child workers for a specific monitoring interval. Location: worker/parent_worker.go

Responsibilities

  • Spawn and manage a pool of child workers
  • Receive tick signals from the orchestrator
  • Fetch URL IDs from Redis for the assigned interval
  • Divide work into chunks and distribute to child workers
  • Forward work chunks through the work pool channel

Implementation details

// From worker/parent_worker.go:42-69
func (pw *ParentWorker) Work() {
    urlLength, err := pw.RedisClient.LLen(pw.Ctx, core.FormatRedisList(pw.Interval)).Result()
    if err != nil {
        log.Println(err)
        return
    }

    if urlLength < 1 {
        return
    }

    // Fetch all the ids in Redis
    urlIds, err := pw.RedisClient.LRange(pw.Ctx, core.FormatRedisList(pw.Interval), 0, urlLength).Result()
    if err != nil {
        log.Println(err)
        return
    }
    
    maxPoolSize := env.FetchInt("MAXIMUM_WORK_POOL_SIZE")
    for i := 0; i < len(urlIds); i += maxPoolSize {
        end := i + maxPoolSize
        if end > len(urlIds) {
            end = len(urlIds)
        }
        chunk := urlIds[i:end]
        pw.WorkPool <- chunk
    }
}
The parent worker chunks URL IDs based on MAXIMUM_WORK_POOL_SIZE to prevent overwhelming the work pool channel.

Configuration

  • MAXIMUM_CHILD_WORKERS: Number of child workers to spawn (must be > 0)
  • MAXIMUM_WORK_POOL_SIZE: Maximum size of work chunks distributed to children

Child worker

Child workers perform the actual HTTP checks and report results to the supervisor. Location: worker/child_worker.go

Responsibilities

  • Listen for work chunks from the parent worker’s work pool
  • Fetch full URL details from Redis hash storage
  • Perform HTTP requests with configured method and timeout
  • Evaluate HTTP response status codes
  • Submit check results to the supervisor as tasks

HTTP check logic

// From worker/child_worker.go:51-94
func (cw *ChildWorker) Work(urlId string) {
    client := &http.Client{
        Timeout: time.Duration(env.FetchInt("HTTP_REQUEST_TIMEOUT", 5)) * time.Second,
    }
    
    var url database.Url
    val, err := cw.ParentWorker.RedisClient.HGet(cw.Ctx, core.FormatRedisHash(cw.ParentWorker.Interval), urlId).Bytes()
    if err != nil {
        fmt.Println(err)
        return
    }

    err = url.UnmarshalBinary(val)
    if err != nil {
        return
    }

    request, err := http.NewRequest(url.HttpMethod.ToMethod(), url.Url, nil)
    if err != nil {
        fmt.Println(err)
        return
    }

    resp, err := client.Do(request)
    if err != nil {
        // Network error - mark as unhealthy
        task := supervisor.Task{
            Healthy: false,
            Url:     url.Url,
            UrlId:   url.Id,
        }
        cw.ParentWorker.Supervisor.WorkPool <- task
        return
    }
    defer resp.Body.Close()
    
    // Check status code - 2xx is healthy
    task := supervisor.Task{
        UrlId:   url.Id,
        Healthy: resp.StatusCode > 199 && resp.StatusCode < 300,
        Url:     url.Url,
    }

    cw.ParentWorker.Supervisor.WorkPool <- task
}
A URL is considered healthy if the HTTP status code is in the 2xx range (200-299). Network errors or other status codes mark the URL as unhealthy.

Configuration

  • HTTP_REQUEST_TIMEOUT: Timeout in seconds for HTTP requests (default: 5)

Supervisor

The supervisor receives check results from workers, applies decision logic, and publishes domain events. Location: supervisor/supervisor.go

Responsibilities

  • Receive check results through a buffered work pool channel
  • Batch tasks for efficient processing
  • Apply decision logic to determine success or failure
  • Publish ping.successful or ping.unsuccessful events to the event bus
  • Implement timeout-based flushing for incomplete batches

Batching logic

// From supervisor/supervisor.go:33-56
func (s *Supervisor) Activate() {
    buffer := make([]Task, 0, s.BatchSize)
    ticker := time.NewTicker(s.Timeout)

    go func() {
        for {
            select {
            case <-s.ctx.Done():
                return
            case task := <-s.WorkPool:
                buffer = append(buffer, task)
                if len(buffer) >= s.BatchSize {
                    s.flush(buffer)
                    buffer = buffer[:0]
                }
            case <-ticker.C:
                if len(buffer) > 0 {
                    s.flush(buffer)
                    buffer = buffer[:0]
                }
            }
        }
    }()
}
The supervisor uses two flush triggers:
  1. Batch size: Flushes when buffer reaches SUPERVISOR_POOL_FLUSH_BATCHSIZE
  2. Timeout: Flushes incomplete batches after SUPERVISOR_POOL_FLUSH_TIMEOUT seconds

Event publishing

// From supervisor/supervisor.go:58-75
func (s *Supervisor) flush(buffer []Task) {
    for _, task := range buffer {
        if task.Healthy {
            s.EventBus.Dispatch(&events.PingSuccessful{
                UrlId:   task.UrlId,
                Healthy: task.Healthy,
                Url:     task.Url,
            })
        } else {
            s.EventBus.Dispatch(&events.PingUnSuccessful{
                UrlId:   task.UrlId,
                Healthy: task.Healthy,
                Url:     task.Url,
            })
        }
    }
}

Event bus

The event bus provides a lightweight pub/sub mechanism for decoupling event producers from consumers. Location: core/event_bus.go

Responsibilities

  • Manage subscriptions to event topics
  • Dispatch events to registered handlers
  • Execute handlers asynchronously in separate goroutines
  • Provide thread-safe subscription management

Implementation

// From core/event_bus.go:32-46
func (bus *EventBusImpl) Subscribe(eventName string, handler EventHandler) {
    bus.rwMutex.Lock()
    defer bus.rwMutex.Unlock()
    bus.handlers[eventName] = append(bus.handlers[eventName], handler)
    bus.Logger().Info(fmt.Sprintf("Subscribing to %s event", eventName))
}

func (bus *EventBusImpl) Dispatch(event Event) {
    bus.rwMutex.RLock()
    defer bus.rwMutex.RUnlock()
    for _, handler := range bus.handlers[event.Name()] {
        bus.Logger().Info(fmt.Sprintf("Dispatching %s event to listeners", event.Name()))
        go handler.Handle(event)
    }
}
Event handlers run asynchronously in separate goroutines, allowing the supervisor to continue processing without waiting for side effects to complete.

Event listeners

Listeners subscribe to events and handle side effects such as persistence and notifications.

Ping successful listener

Location: events/listeners/ping_successful_listener.go Handles successful ping events:
  1. State transition detection: Checks if the URL was previously unhealthy
  2. Incident resolution: Marks incidents as resolved in the database
  3. Recovery notification: Sends “Site is UP” email to the contact address
  4. Metrics persistence: Stores the successful check in the url_statuses hypertable
  5. Status update: Updates the URL’s current status to healthy in the urls table
// From events/listeners/ping_successful_listener.go:31-47
if url.Status == enums.UnHealthy {
    incidentRepo := database.NewIncidentRepository(sl.DB)
    err := incidentRepo.Resolve(sl.ctx, url.Id)
    if err != nil {
        sl.logger.Error("Unable to log incident as resolved: ", err.Error(), url)
    }

    err = core.SendEmail(core.SendEmailConfig{
        Recipients:  []string{url.ContactEmail},
        Subject:     "Your Site is now UP",
        Content:     fmt.Sprintf("Your Site `%v` is UP. It went up at %v. Good work", url.Url, time.Now()),
        ContentType: "text/plain",
    })
    if err != nil {
        sl.logger.Error("Error sending monitoring alert email: ", err, e)
    }
}

Ping unsuccessful listener

Location: events/listeners/ping_unsuccessful_listener.go Handles unsuccessful ping events:
  1. State transition detection: Checks if the URL was previously healthy
  2. Incident creation: Logs a new incident in the database
  3. Downtime notification: Sends “Site is DOWN” email to the contact address
  4. Status update: Updates the URL’s current status to unhealthy in the urls table
  5. Metrics persistence: Stores the failed check in the url_statuses hypertable
// From events/listeners/ping_unsuccessful_listener.go:33-49
if url.Status == enums.Healthy {
    incidentRepo := database.NewIncidentRepository(sl.DB)
    err := incidentRepo.Add(sl.ctx, url.Id)
    if err != nil {
        sl.logger.Error("Unable to log incident: ", err.Error(), url)
    }

    err = core.SendEmail(core.SendEmailConfig{
        Recipients:  []string{url.ContactEmail},
        Subject:     "Your Site is DOWN",
        Content:     fmt.Sprintf("Your Site `%v` is DOWN. It went down at %v\n . Please check it out", url.Url, time.Now()),
        ContentType: "text/plain",
    })
    if err != nil {
        sl.logger.Error("Error sending monitoring alert email: ", err, e)
    }
}
Notifications are only sent on state transitions (healthy → unhealthy or unhealthy → healthy), not on every check. This prevents notification spam.

Database repositories

Repositories encapsulate SQL operations for data persistence.

URL repository

Location: database/url_repository.go Provides methods for:
  • Add: Insert new monitored URLs
  • FindById: Fetch URL by ID
  • FetchAll: List URLs with filtering and pagination
  • UpdateStatus: Update URL health status
  • Remove: Delete a URL

URL status repository

Location: database/url_status_repository.go Provides methods for time-series data:
  • Add: Insert status check results into the hypertable
  • GetRecentStatus: Fetch most recent status matching a condition
  • GetLastStatus: Fetch the most recent status regardless of value
// From database/url_status_repository.go:17-24
func (ur urlStatusRepository) Add(ctx context.Context, urlId int, status bool) error {
    sql := "INSERT INTO url_statuses (time, url_id,status) VALUES (NOW(), $1,$2)"

    _, err := ur.pool.Exec(ctx, sql, urlId, status)
    if err != nil {
        return err
    }
    return nil
}

Next steps

Architecture overview

Understand the high-level system design

Event flow

Follow the complete event-driven workflow

Build docs developers (and LLMs) love