Skip to main content
A worker pool is a pattern for distributing work across multiple goroutines. This allows you to control concurrency, limit resource usage, and efficiently process tasks in parallel.

Basic Worker Pool

package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "started  job", j)
		time.Sleep(time.Second)
		fmt.Println("worker", id, "finished job", j)
		results <- j * 2
	}
}

func main() {
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	// Start 3 workers
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// Send jobs
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	// Collect results
	for a := 1; a <= numJobs; a++ {
		<-results
	}
}
Workers process jobs concurrently from the jobs channel and send results to the results channel. The pool size (3 workers) limits concurrency.

Why Use Worker Pools?

1. Control Concurrency

Limit the number of concurrent operations:
// Without pool: Unbounded concurrency
for _, task := range tasks {
	go process(task)  // Could spawn millions of goroutines!
}

// With pool: Controlled concurrency
for _, task := range tasks {
	jobs <- task  // Only N workers process at once
}

2. Resource Management

Limit resource consumption (memory, connections, CPU):
// 10 workers = max 10 concurrent database connections
for w := 1; w <= 10; w++ {
	go worker(w, jobs, results)
}

3. Backpressure

Buffered job channel provides natural backpressure:
jobs := make(chan Job, 100)  // Buffer 100 jobs
// Sender blocks when buffer full

Worker Pool Patterns

1. Fixed-Size Pool

type WorkerPool struct {
	numWorkers int
	jobs       chan Job
	results    chan Result
	wg         sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
	return &WorkerPool{
		numWorkers: numWorkers,
		jobs:       make(chan Job, 100),
		results:    make(chan Result, 100),
	}
}

func (wp *WorkerPool) Start() {
	for i := 0; i < wp.numWorkers; i++ {
		wp.wg.Add(1)
		go wp.worker(i)
	}
}

func (wp *WorkerPool) worker(id int) {
	defer wp.wg.Done()
	
	for job := range wp.jobs {
		result := job.Process()
		wp.results <- result
	}
}

func (wp *WorkerPool) Submit(job Job) {
	wp.jobs <- job
}

func (wp *WorkerPool) Close() {
	close(wp.jobs)
	wp.wg.Wait()
	close(wp.results)
}

func (wp *WorkerPool) Results() <-chan Result {
	return wp.results
}

2. Dynamic Pool with Timeout

type DynamicPool struct {
	jobs       chan Job
	results    chan Result
	workers    atomic.Int32
	maxWorkers int
	idleTimeout time.Duration
}

func (dp *DynamicPool) worker() {
	dp.workers.Add(1)
	defer dp.workers.Add(-1)
	
	timer := time.NewTimer(dp.idleTimeout)
	defer timer.Stop()
	
	for {
		timer.Reset(dp.idleTimeout)
		select {
		case job, ok := <-dp.jobs:
			if !ok {
				return
			}
			dp.results <- job.Process()
		case <-timer.C:
			// Idle timeout - exit worker
			return
		}
	}
}

func (dp *DynamicPool) Submit(job Job) {
	// Spawn new worker if below max
	if dp.workers.Load() < int32(dp.maxWorkers) {
		go dp.worker()
	}
	dp.jobs <- job
}

3. Pool with Error Handling

type Result struct {
	Value interface{}
	Error error
}

func worker(id int, jobs <-chan Job, results chan<- Result) {
	for job := range jobs {
		value, err := job.Execute()
		results <- Result{Value: value, Error: err}
	}
}

func processResults(results <-chan Result, count int) error {
	var errs []error
	
	for i := 0; i < count; i++ {
		result := <-results
		if result.Error != nil {
			errs = append(errs, result.Error)
		} else {
			handleSuccess(result.Value)
		}
	}
	
	if len(errs) > 0 {
		return fmt.Errorf("encountered %d errors", len(errs))
	}
	return nil
}

4. Pool with Context

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) {
	for {
		select {
		case <-ctx.Done():
			return
		case job, ok := <-jobs:
			if !ok {
				return
			}
			result := job.Execute(ctx)
			select {
			case results <- result:
			case <-ctx.Done():
				return
			}
		}
	}
}

func RunPool(ctx context.Context, numWorkers int, jobs []Job) []Result {
	jobCh := make(chan Job, len(jobs))
	resultCh := make(chan Result, len(jobs))
	
	// Start workers
	var wg sync.WaitGroup
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(ctx, id, jobCh, resultCh)
		}(i)
	}
	
	// Send jobs
	for _, job := range jobs {
		jobCh <- job
	}
	close(jobCh)
	
	// Close results when workers done
	go func() {
		wg.Wait()
		close(resultCh)
	}()
	
	// Collect results
	var results []Result
	for result := range resultCh {
		results = append(results, result)
	}
	
	return results
}

Practical Examples

HTTP Request Pool

type URLJob struct {
	URL string
}

type URLResult struct {
	URL    string
	Status int
	Body   string
	Error  error
}

func httpWorker(id int, jobs <-chan URLJob, results chan<- URLResult) {
	client := &http.Client{Timeout: 10 * time.Second}
	
	for job := range jobs {
		resp, err := client.Get(job.URL)
		if err != nil {
			results <- URLResult{URL: job.URL, Error: err}
			continue
		}
		
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		
		results <- URLResult{
			URL:    job.URL,
			Status: resp.StatusCode,
			Body:   string(body),
		}
	}
}

func fetchURLs(urls []string, numWorkers int) []URLResult {
	jobs := make(chan URLJob, len(urls))
	results := make(chan URLResult, len(urls))
	
	// Start workers
	for w := 1; w <= numWorkers; w++ {
		go httpWorker(w, jobs, results)
	}
	
	// Submit jobs
	for _, url := range urls {
		jobs <- URLJob{URL: url}
	}
	close(jobs)
	
	// Collect results
	var allResults []URLResult
	for i := 0; i < len(urls); i++ {
		allResults = append(allResults, <-results)
	}
	
	return allResults
}

Database Query Pool

type QueryJob struct {
	ID    int
	Query string
}

type QueryResult struct {
	ID   int
	Rows []map[string]interface{}
	Err  error
}

func dbWorker(db *sql.DB, jobs <-chan QueryJob, results chan<- QueryResult) {
	for job := range jobs {
		rows, err := db.Query(job.Query)
		if err != nil {
			results <- QueryResult{ID: job.ID, Err: err}
			continue
		}
		
		data, _ := scanRows(rows)
		rows.Close()
		
		results <- QueryResult{ID: job.ID, Rows: data}
	}
}

Image Processing Pool

type ImageJob struct {
	InputPath  string
	OutputPath string
	Width      int
	Height     int
}

func imageWorker(id int, jobs <-chan ImageJob, results chan<- error) {
	for job := range jobs {
		err := resizeImage(job.InputPath, job.OutputPath, job.Width, job.Height)
		results <- err
	}
}

func processImages(images []ImageJob, numWorkers int) error {
	jobs := make(chan ImageJob, len(images))
	results := make(chan error, len(images))
	
	for w := 1; w <= numWorkers; w++ {
		go imageWorker(w, jobs, results)
	}
	
	for _, img := range images {
		jobs <- img
	}
	close(jobs)
	
	var errs []error
	for i := 0; i < len(images); i++ {
		if err := <-results; err != nil {
			errs = append(errs, err)
		}
	}
	
	if len(errs) > 0 {
		return fmt.Errorf("%d images failed", len(errs))
	}
	return nil
}

Sizing Your Pool

CPU-Bound Tasks

// Use number of CPUs
numWorkers := runtime.NumCPU()

I/O-Bound Tasks

// Can use more workers (10-100x CPU count)
numWorkers := runtime.NumCPU() * 10

Rate-Limited APIs

// Match API rate limit
// e.g., 10 requests/second = 10 workers
numWorkers := 10
Profile your application to find the optimal pool size. Too few workers underutilize resources; too many cause thrashing.

Graceful Shutdown

type Pool struct {
	workers int
	jobs    chan Job
	results chan Result
	wg      sync.WaitGroup
	quit    chan struct{}
}

func (p *Pool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go p.worker(i)
	}
}

func (p *Pool) worker(id int) {
	defer p.wg.Done()
	
	for {
		select {
		case job, ok := <-p.jobs:
			if !ok {
				return  // Jobs channel closed
			}
			p.results <- job.Process()
		case <-p.quit:
			return  // Shutdown signal
		}
	}
}

func (p *Pool) Shutdown() {
	close(p.quit)    // Signal workers to stop
	close(p.jobs)    // Close jobs channel
	p.wg.Wait()      // Wait for workers to finish
	close(p.results) // Close results
}

Monitoring Pool Health

type MonitoredPool struct {
	*Pool
	processed atomic.Int64
	failed    atomic.Int64
	active    atomic.Int32
}

func (mp *MonitoredPool) worker(id int) {
	defer mp.wg.Done()
	
	for job := range mp.jobs {
		mp.active.Add(1)
		
		result, err := job.Execute()
		if err != nil {
			mp.failed.Add(1)
		} else {
			mp.processed.Add(1)
		}
		
		mp.active.Add(-1)
		mp.results <- Result{Value: result, Error: err}
	}
}

func (mp *MonitoredPool) Stats() map[string]int64 {
	return map[string]int64{
		"processed": mp.processed.Load(),
		"failed":    mp.failed.Load(),
		"active":    int64(mp.active.Load()),
	}
}

Best Practices

  1. Size appropriately - Match pool size to workload type
  2. Use buffered channels - Reduce blocking between producer/consumer
  3. Handle errors - Don’t silently ignore worker errors
  4. Implement shutdown - Allow graceful pool termination
  5. Monitor performance - Track throughput and failure rates
  6. Use WaitGroups - Ensure all workers complete
  7. Close channels properly - Close jobs when done sending
  8. Consider context - Support cancellation and timeouts

Common Pitfalls

Not Closing Jobs Channel

// BAD: Workers never exit
for w := 1; w <= 3; w++ {
	go worker(w, jobs, results)
}
for j := 1; j <= 10; j++ {
	jobs <- j
}
// Forgot close(jobs) - workers block forever!

// GOOD: Close when done
for j := 1; j <= 10; j++ {
	jobs <- j
}
close(jobs)

Deadlock on Results

// BAD: Deadlock if results buffer too small
jobs := make(chan Job, 100)
results := make(chan Result)  // Unbuffered!

// Workers block sending results
// Main goroutine blocks sending jobs

// GOOD: Buffer results or consume concurrently
results := make(chan Result, 100)

Forgetting WaitGroup

// BAD: May exit before workers finish
go worker(1, jobs, results)
go worker(2, jobs, results)
close(jobs)
// Program may exit immediately!

// GOOD: Wait for workers
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); worker(1, jobs, results) }()
go func() { defer wg.Done(); worker(2, jobs, results) }()
wg.Wait()

Build docs developers (and LLMs) love