A worker pool is a pattern for distributing work across multiple goroutines. This allows you to control concurrency, limit resource usage, and efficiently process tasks in parallel.
Basic Worker Pool
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= numJobs; a++ {
<-results
}
}
Workers process jobs concurrently from the jobs channel and send results to the results channel. The pool size (3 workers) limits concurrency.
Why Use Worker Pools?
1. Control Concurrency
Limit the number of concurrent operations:
// Without pool: Unbounded concurrency
for _, task := range tasks {
go process(task) // Could spawn millions of goroutines!
}
// With pool: Controlled concurrency
for _, task := range tasks {
jobs <- task // Only N workers process at once
}
2. Resource Management
Limit resource consumption (memory, connections, CPU):
// 10 workers = max 10 concurrent database connections
for w := 1; w <= 10; w++ {
go worker(w, jobs, results)
}
3. Backpressure
Buffered job channel provides natural backpressure:
jobs := make(chan Job, 100) // Buffer 100 jobs
// Sender blocks when buffer full
Worker Pool Patterns
1. Fixed-Size Pool
type WorkerPool struct {
numWorkers int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
numWorkers: numWorkers,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
result := job.Process()
wp.results <- result
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
2. Dynamic Pool with Timeout
type DynamicPool struct {
jobs chan Job
results chan Result
workers atomic.Int32
maxWorkers int
idleTimeout time.Duration
}
func (dp *DynamicPool) worker() {
dp.workers.Add(1)
defer dp.workers.Add(-1)
timer := time.NewTimer(dp.idleTimeout)
defer timer.Stop()
for {
timer.Reset(dp.idleTimeout)
select {
case job, ok := <-dp.jobs:
if !ok {
return
}
dp.results <- job.Process()
case <-timer.C:
// Idle timeout - exit worker
return
}
}
}
func (dp *DynamicPool) Submit(job Job) {
// Spawn new worker if below max
if dp.workers.Load() < int32(dp.maxWorkers) {
go dp.worker()
}
dp.jobs <- job
}
3. Pool with Error Handling
type Result struct {
Value interface{}
Error error
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
value, err := job.Execute()
results <- Result{Value: value, Error: err}
}
}
func processResults(results <-chan Result, count int) error {
var errs []error
for i := 0; i < count; i++ {
result := <-results
if result.Error != nil {
errs = append(errs, result.Error)
} else {
handleSuccess(result.Value)
}
}
if len(errs) > 0 {
return fmt.Errorf("encountered %d errors", len(errs))
}
return nil
}
4. Pool with Context
func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
result := job.Execute(ctx)
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}
func RunPool(ctx context.Context, numWorkers int, jobs []Job) []Result {
jobCh := make(chan Job, len(jobs))
resultCh := make(chan Result, len(jobs))
// Start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(ctx, id, jobCh, resultCh)
}(i)
}
// Send jobs
for _, job := range jobs {
jobCh <- job
}
close(jobCh)
// Close results when workers done
go func() {
wg.Wait()
close(resultCh)
}()
// Collect results
var results []Result
for result := range resultCh {
results = append(results, result)
}
return results
}
Practical Examples
HTTP Request Pool
type URLJob struct {
URL string
}
type URLResult struct {
URL string
Status int
Body string
Error error
}
func httpWorker(id int, jobs <-chan URLJob, results chan<- URLResult) {
client := &http.Client{Timeout: 10 * time.Second}
for job := range jobs {
resp, err := client.Get(job.URL)
if err != nil {
results <- URLResult{URL: job.URL, Error: err}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- URLResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
}
}
}
func fetchURLs(urls []string, numWorkers int) []URLResult {
jobs := make(chan URLJob, len(urls))
results := make(chan URLResult, len(urls))
// Start workers
for w := 1; w <= numWorkers; w++ {
go httpWorker(w, jobs, results)
}
// Submit jobs
for _, url := range urls {
jobs <- URLJob{URL: url}
}
close(jobs)
// Collect results
var allResults []URLResult
for i := 0; i < len(urls); i++ {
allResults = append(allResults, <-results)
}
return allResults
}
Database Query Pool
type QueryJob struct {
ID int
Query string
}
type QueryResult struct {
ID int
Rows []map[string]interface{}
Err error
}
func dbWorker(db *sql.DB, jobs <-chan QueryJob, results chan<- QueryResult) {
for job := range jobs {
rows, err := db.Query(job.Query)
if err != nil {
results <- QueryResult{ID: job.ID, Err: err}
continue
}
data, _ := scanRows(rows)
rows.Close()
results <- QueryResult{ID: job.ID, Rows: data}
}
}
Image Processing Pool
type ImageJob struct {
InputPath string
OutputPath string
Width int
Height int
}
func imageWorker(id int, jobs <-chan ImageJob, results chan<- error) {
for job := range jobs {
err := resizeImage(job.InputPath, job.OutputPath, job.Width, job.Height)
results <- err
}
}
func processImages(images []ImageJob, numWorkers int) error {
jobs := make(chan ImageJob, len(images))
results := make(chan error, len(images))
for w := 1; w <= numWorkers; w++ {
go imageWorker(w, jobs, results)
}
for _, img := range images {
jobs <- img
}
close(jobs)
var errs []error
for i := 0; i < len(images); i++ {
if err := <-results; err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("%d images failed", len(errs))
}
return nil
}
Sizing Your Pool
CPU-Bound Tasks
// Use number of CPUs
numWorkers := runtime.NumCPU()
I/O-Bound Tasks
// Can use more workers (10-100x CPU count)
numWorkers := runtime.NumCPU() * 10
Rate-Limited APIs
// Match API rate limit
// e.g., 10 requests/second = 10 workers
numWorkers := 10
Profile your application to find the optimal pool size. Too few workers underutilize resources; too many cause thrashing.
Graceful Shutdown
type Pool struct {
workers int
jobs chan Job
results chan Result
wg sync.WaitGroup
quit chan struct{}
}
func (p *Pool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for {
select {
case job, ok := <-p.jobs:
if !ok {
return // Jobs channel closed
}
p.results <- job.Process()
case <-p.quit:
return // Shutdown signal
}
}
}
func (p *Pool) Shutdown() {
close(p.quit) // Signal workers to stop
close(p.jobs) // Close jobs channel
p.wg.Wait() // Wait for workers to finish
close(p.results) // Close results
}
Monitoring Pool Health
type MonitoredPool struct {
*Pool
processed atomic.Int64
failed atomic.Int64
active atomic.Int32
}
func (mp *MonitoredPool) worker(id int) {
defer mp.wg.Done()
for job := range mp.jobs {
mp.active.Add(1)
result, err := job.Execute()
if err != nil {
mp.failed.Add(1)
} else {
mp.processed.Add(1)
}
mp.active.Add(-1)
mp.results <- Result{Value: result, Error: err}
}
}
func (mp *MonitoredPool) Stats() map[string]int64 {
return map[string]int64{
"processed": mp.processed.Load(),
"failed": mp.failed.Load(),
"active": int64(mp.active.Load()),
}
}
Best Practices
- Size appropriately - Match pool size to workload type
- Use buffered channels - Reduce blocking between producer/consumer
- Handle errors - Don’t silently ignore worker errors
- Implement shutdown - Allow graceful pool termination
- Monitor performance - Track throughput and failure rates
- Use WaitGroups - Ensure all workers complete
- Close channels properly - Close jobs when done sending
- Consider context - Support cancellation and timeouts
Common Pitfalls
Not Closing Jobs Channel
// BAD: Workers never exit
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 10; j++ {
jobs <- j
}
// Forgot close(jobs) - workers block forever!
// GOOD: Close when done
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
Deadlock on Results
// BAD: Deadlock if results buffer too small
jobs := make(chan Job, 100)
results := make(chan Result) // Unbuffered!
// Workers block sending results
// Main goroutine blocks sending jobs
// GOOD: Buffer results or consume concurrently
results := make(chan Result, 100)
Forgetting WaitGroup
// BAD: May exit before workers finish
go worker(1, jobs, results)
go worker(2, jobs, results)
close(jobs)
// Program may exit immediately!
// GOOD: Wait for workers
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); worker(1, jobs, results) }()
go func() { defer wg.Done(); worker(2, jobs, results) }()
wg.Wait()