Skip to main content

Overview

The worker package provides type-safe, generic functions for concurrent processing of slices with bounded concurrency. It handles goroutine management, error propagation, and context cancellation automatically. Built on top of golang.org/x/sync/errgroup, it provides:
  • Bounded concurrency to prevent resource exhaustion
  • Automatic goroutine cleanup
  • First-error-stops-all semantics
  • Context-based cancellation

Import

import "github.com/yourusername/gotemplate/pkg/worker"

Functions

Run

func Run[T any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) error) error
Processes items concurrently with bounded parallelism. It spawns at most concurrency goroutines and processes each item using the provided function. Returns immediately on first error, cancelling remaining work.
T
type parameter
Generic type parameter for the items being processed. Can be any type.
ctx
context.Context
required
Context for cancellation. When cancelled or when any function returns an error, all remaining work is cancelled.
items
[]T
required
Slice of items to process. Each item will be passed to fn exactly once.
concurrency
int
required
Maximum number of concurrent goroutines. Must be at least 1. Use runtime.NumCPU() for CPU-bound work or higher values for I/O-bound work.
fn
func(ctx context.Context, item T) error
required
Function to execute for each item. Should return nil on success or an error to stop all processing.
Returns: nil if all items are processed successfully, or the first error encountered.

Example: Concurrent HTTP Requests

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
    
    "github.com/yourusername/gotemplate/pkg/worker"
)

func main() {
    urls := []string{
        "https://api.example.com/users/1",
        "https://api.example.com/users/2",
        "https://api.example.com/users/3",
        "https://api.example.com/users/4",
        "https://api.example.com/users/5",
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    err := worker.Run(ctx, urls, 3, func(ctx context.Context, url string) error {
        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            return err
        }
        
        resp, err := http.DefaultClient.Do(req)
        if err != nil {
            return err
        }
        defer resp.Body.Close()
        
        fmt.Printf("Fetched %s: %d\n", url, resp.StatusCode)
        return nil
    })
    
    if err != nil {
        fmt.Printf("Error: %v\n", err)
    }
}

Example: Concurrent File Processing

files := []string{"data1.txt", "data2.txt", "data3.txt"}

err := worker.Run(context.Background(), files, 2, func(ctx context.Context, filename string) error {
    data, err := os.ReadFile(filename)
    if err != nil {
        return fmt.Errorf("read %s: %w", filename, err)
    }
    
    // Process data
    processed := process(data)
    
    outfile := "processed_" + filename
    if err := os.WriteFile(outfile, processed, 0644); err != nil {
        return fmt.Errorf("write %s: %w", outfile, err)
    }
    
    return nil
})

if err != nil {
    log.Fatalf("Processing failed: %v", err)
}

Example: Database Batch Operations

type User struct {
    ID   int
    Name string
}

users := []User{
    {ID: 1, Name: "Alice"},
    {ID: 2, Name: "Bob"},
    {ID: 3, Name: "Charlie"},
}

err := worker.Run(ctx, users, 5, func(ctx context.Context, user User) error {
    _, err := db.ExecContext(ctx, 
        "UPDATE users SET last_seen = NOW() WHERE id = $1",
        user.ID,
    )
    return err
})

Map

func Map[T, R any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) (R, error)) ([]R, error)
Processes items concurrently and collects results in input order. Similar to Run(), but returns a slice of results with the same length and order as the input items.
T
type parameter
Generic type parameter for input items. Can be any type.
R
type parameter
Generic type parameter for result values. Can be any type.
ctx
context.Context
required
Context for cancellation. When cancelled or when any function returns an error, all remaining work is cancelled.
items
[]T
required
Slice of items to process. Each item will be passed to fn exactly once.
concurrency
int
required
Maximum number of concurrent goroutines. Must be at least 1.
fn
func(ctx context.Context, item T) (R, error)
required
Function to execute for each item. Should return a result and nil on success, or zero value and an error to stop all processing.
Returns: A slice of results in the same order as input items (on success), or nil and an error on failure.

Example: Fetch User Details

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    
    "github.com/yourusername/gotemplate/pkg/worker"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func main() {
    userIDs := []int{1, 2, 3, 4, 5}
    
    users, err := worker.Map(context.Background(), userIDs, 3, 
        func(ctx context.Context, id int) (User, error) {
            url := fmt.Sprintf("https://api.example.com/users/%d", id)
            
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return User{}, err
            }
            
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return User{}, err
            }
            defer resp.Body.Close()
            
            var user User
            if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
                return User{}, err
            }
            
            return user, nil
        },
    )
    
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }
    
    // Users are in the same order as userIDs
    for i, user := range users {
        fmt.Printf("User %d: %+v\n", userIDs[i], user)
    }
}

Example: Parallel Data Transformation

numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

squares, err := worker.Map(context.Background(), numbers, 4,
    func(ctx context.Context, n int) (int, error) {
        // Simulate expensive computation
        time.Sleep(100 * time.Millisecond)
        return n * n, nil
    },
)

if err != nil {
    log.Fatal(err)
}

fmt.Println(squares) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Example: Image Processing Pipeline

type ImageResult struct {
    Filename string
    Width    int
    Height   int
}

filenames := []string{"img1.jpg", "img2.jpg", "img3.jpg"}

results, err := worker.Map(ctx, filenames, 2,
    func(ctx context.Context, filename string) (ImageResult, error) {
        img, err := loadImage(filename)
        if err != nil {
            return ImageResult{}, err
        }
        
        // Process image
        processed := processImage(img)
        
        // Save result
        outfile := "thumb_" + filename
        if err := saveImage(outfile, processed); err != nil {
            return ImageResult{}, err
        }
        
        return ImageResult{
            Filename: outfile,
            Width:    processed.Width,
            Height:   processed.Height,
        }, nil
    },
)

if err != nil {
    log.Fatalf("Image processing failed: %v", err)
}

for _, result := range results {
    fmt.Printf("Processed: %s (%dx%d)\n", result.Filename, result.Width, result.Height)
}

Example: Type Conversion

strings := []string{"1", "2", "3", "invalid", "5"}

numbers, err := worker.Map(context.Background(), strings, 10,
    func(ctx context.Context, s string) (int, error) {
        return strconv.Atoi(s)
    },
)

if err != nil {
    // Will fail on "invalid" and stop processing
    fmt.Printf("Conversion error: %v\n", err)
}

Complete Example: Web Scraper

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net/http"
    "time"
    
    "github.com/yourusername/gotemplate/pkg/worker"
)

type Page struct {
    URL     string
    Title   string
    Content string
}

func scrape(ctx context.Context, url string) (Page, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return Page{}, err
    }
    
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return Page{}, err
    }
    defer resp.Body.Close()
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return Page{}, err
    }
    
    // Extract title (simplified)
    title := extractTitle(string(body))
    
    return Page{
        URL:     url,
        Title:   title,
        Content: string(body),
    }, nil
}

func main() {
    urls := []string{
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        "https://example.com/page4",
        "https://example.com/page5",
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
    defer cancel()
    
    // Scrape all pages concurrently with max 3 concurrent requests
    pages, err := worker.Map(ctx, urls, 3, scrape)
    if err != nil {
        log.Fatalf("Scraping failed: %v", err)
    }
    
    // Process results
    for _, page := range pages {
        fmt.Printf("Scraped: %s - %s\n", page.URL, page.Title)
    }
}

func extractTitle(html string) string {
    // Simplified title extraction
    return "Page Title"
}

Error Handling

First-Error Semantics

Both Run() and Map() stop all work when any function returns an error:
err := worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
    if item == "bad" {
        return fmt.Errorf("encountered bad item") // Stops all work
    }
    return process(item)
})

Context Cancellation

When the context is cancelled, all goroutines receive the cancellation signal:
ctx, cancel := context.WithCancel(context.Background())

go func() {
    time.Sleep(5 * time.Second)
    cancel() // Cancels all workers
}()

err := worker.Run(ctx, items, 10, func(ctx context.Context, item string) error {
    select {
    case <-ctx.Done():
        return ctx.Err() // Goroutine respects cancellation
    default:
        return process(item)
    }
})

Error Wrapping

Add context to errors for better debugging:
err := worker.Run(ctx, files, 5, func(ctx context.Context, file string) error {
    if err := process(file); err != nil {
        return fmt.Errorf("process %s: %w", file, err)
    }
    return nil
})

Best Practices

Choosing Concurrency Level

CPU-bound work:
import "runtime"

// Use number of CPU cores
concurrency := runtime.NumCPU()
worker.Run(ctx, items, concurrency, cpuIntensiveFunc)
I/O-bound work:
// Use higher concurrency for I/O operations
concurrency := 10  // or 20, 50, 100+ depending on the service
worker.Run(ctx, items, concurrency, networkFunc)
Rate-limited APIs:
// Respect rate limits
concurrency := 5  // Stay within API rate limits
worker.Run(ctx, items, concurrency, apiFunc)

Always Use Context Timeouts

// Good: Prevents hanging indefinitely
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
worker.Run(ctx, items, 10, fn)

// Bad: Could hang forever
worker.Run(context.Background(), items, 10, fn)

Handle Partial Results

When using Map(), remember that on error you get no results:
results, err := worker.Map(ctx, items, 10, fn)
if err != nil {
    // results is nil, no partial results available
    log.Printf("Failed: %v", err)
    return
}
// All results succeeded

Respect Context in Worker Functions

// Good: Checks context regularly
worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
    if err := ctx.Err(); err != nil {
        return err
    }
    return process(ctx, item)
})

// Bad: Ignores context
worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
    return process(item) // Doesn't pass context through
})

Use for Appropriate Data Sizes

// Good: Worth parallelizing
items := make([]int, 10000)
worker.Run(ctx, items, 10, expensiveFunc)

// Bad: Overhead may exceed benefit
items := []int{1, 2, 3}
worker.Run(ctx, items, 10, cheapFunc) // Just use a loop instead

Performance Considerations

Goroutine Overhead

  • Each goroutine has ~2KB stack overhead
  • Context switching has CPU cost
  • Use bounded concurrency to prevent resource exhaustion

Memory Usage

For Map(), the entire result slice is allocated upfront:
// Allocates space for 1 million results immediately
items := make([]int, 1_000_000)
results, _ := worker.Map(ctx, items, 100, fn) // Memory: len(items) * sizeof(R)

Order Preservation

Map() preserves order but may complete faster items must wait:
items := []int{1, 2, 3}  // Item 2 takes 10s, others take 1s
results, _ := worker.Map(ctx, items, 3, fn)
// Total time: ~10s (waits for slowest item)
// Results are still [result1, result2, result3]

Comparison: Run vs Map

FeatureRunMap
Returns resultsNoYes
Memory overheadMinimalO(n) for results
Order preservationN/AYes
Use caseSide effectsTransformations
ExampleWrite files, send emailsFetch data, parse JSON

Integration with Other Packages

With Retry Package

import (
    "github.com/yourusername/gotemplate/pkg/retry"
    "github.com/yourusername/gotemplate/pkg/worker"
)

err := worker.Run(ctx, urls, 5, func(ctx context.Context, url string) error {
    return retry.Do(ctx, func(ctx context.Context) error {
        return fetchURL(ctx, url)
    }, retry.WithMaxAttempts(3))
})

With Database Connection Pool

err := worker.Run(ctx, records, 20, func(ctx context.Context, record Record) error {
    return db.ExecContext(ctx, "INSERT INTO ...", record.Values())
})
// Database pool handles actual concurrency limits

Build docs developers (and LLMs) love