Overview
The worker package provides type-safe, generic functions for concurrent processing of slices with bounded concurrency. It handles goroutine management, error propagation, and context cancellation automatically.
Built on top of golang.org/x/sync/errgroup, it provides:
- Bounded concurrency to prevent resource exhaustion
- Automatic goroutine cleanup
- First-error-stops-all semantics
- Context-based cancellation
Import
import "github.com/yourusername/gotemplate/pkg/worker"
Functions
Run
func Run[T any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) error) error
Processes items concurrently with bounded parallelism. It spawns at most concurrency goroutines and processes each item using the provided function. Returns immediately on first error, cancelling remaining work.
Generic type parameter for the items being processed. Can be any type.
Context for cancellation. When cancelled or when any function returns an error, all remaining work is cancelled.
Slice of items to process. Each item will be passed to fn exactly once.
Maximum number of concurrent goroutines. Must be at least 1. Use runtime.NumCPU() for CPU-bound work or higher values for I/O-bound work.
fn
func(ctx context.Context, item T) error
required
Function to execute for each item. Should return nil on success or an error to stop all processing.
Returns: nil if all items are processed successfully, or the first error encountered.
Example: Concurrent HTTP Requests
package main
import (
"context"
"fmt"
"net/http"
"time"
"github.com/yourusername/gotemplate/pkg/worker"
)
func main() {
urls := []string{
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3",
"https://api.example.com/users/4",
"https://api.example.com/users/5",
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := worker.Run(ctx, urls, 3, func(ctx context.Context, url string) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
fmt.Printf("Fetched %s: %d\n", url, resp.StatusCode)
return nil
})
if err != nil {
fmt.Printf("Error: %v\n", err)
}
}
Example: Concurrent File Processing
files := []string{"data1.txt", "data2.txt", "data3.txt"}
err := worker.Run(context.Background(), files, 2, func(ctx context.Context, filename string) error {
data, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("read %s: %w", filename, err)
}
// Process data
processed := process(data)
outfile := "processed_" + filename
if err := os.WriteFile(outfile, processed, 0644); err != nil {
return fmt.Errorf("write %s: %w", outfile, err)
}
return nil
})
if err != nil {
log.Fatalf("Processing failed: %v", err)
}
Example: Database Batch Operations
type User struct {
ID int
Name string
}
users := []User{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 3, Name: "Charlie"},
}
err := worker.Run(ctx, users, 5, func(ctx context.Context, user User) error {
_, err := db.ExecContext(ctx,
"UPDATE users SET last_seen = NOW() WHERE id = $1",
user.ID,
)
return err
})
Map
func Map[T, R any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) (R, error)) ([]R, error)
Processes items concurrently and collects results in input order. Similar to Run(), but returns a slice of results with the same length and order as the input items.
Generic type parameter for input items. Can be any type.
Generic type parameter for result values. Can be any type.
Context for cancellation. When cancelled or when any function returns an error, all remaining work is cancelled.
Slice of items to process. Each item will be passed to fn exactly once.
Maximum number of concurrent goroutines. Must be at least 1.
fn
func(ctx context.Context, item T) (R, error)
required
Function to execute for each item. Should return a result and nil on success, or zero value and an error to stop all processing.
Returns: A slice of results in the same order as input items (on success), or nil and an error on failure.
Example: Fetch User Details
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/yourusername/gotemplate/pkg/worker"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
func main() {
userIDs := []int{1, 2, 3, 4, 5}
users, err := worker.Map(context.Background(), userIDs, 3,
func(ctx context.Context, id int) (User, error) {
url := fmt.Sprintf("https://api.example.com/users/%d", id)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return User{}, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return User{}, err
}
defer resp.Body.Close()
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return User{}, err
}
return user, nil
},
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
// Users are in the same order as userIDs
for i, user := range users {
fmt.Printf("User %d: %+v\n", userIDs[i], user)
}
}
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
squares, err := worker.Map(context.Background(), numbers, 4,
func(ctx context.Context, n int) (int, error) {
// Simulate expensive computation
time.Sleep(100 * time.Millisecond)
return n * n, nil
},
)
if err != nil {
log.Fatal(err)
}
fmt.Println(squares) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Example: Image Processing Pipeline
type ImageResult struct {
Filename string
Width int
Height int
}
filenames := []string{"img1.jpg", "img2.jpg", "img3.jpg"}
results, err := worker.Map(ctx, filenames, 2,
func(ctx context.Context, filename string) (ImageResult, error) {
img, err := loadImage(filename)
if err != nil {
return ImageResult{}, err
}
// Process image
processed := processImage(img)
// Save result
outfile := "thumb_" + filename
if err := saveImage(outfile, processed); err != nil {
return ImageResult{}, err
}
return ImageResult{
Filename: outfile,
Width: processed.Width,
Height: processed.Height,
}, nil
},
)
if err != nil {
log.Fatalf("Image processing failed: %v", err)
}
for _, result := range results {
fmt.Printf("Processed: %s (%dx%d)\n", result.Filename, result.Width, result.Height)
}
Example: Type Conversion
strings := []string{"1", "2", "3", "invalid", "5"}
numbers, err := worker.Map(context.Background(), strings, 10,
func(ctx context.Context, s string) (int, error) {
return strconv.Atoi(s)
},
)
if err != nil {
// Will fail on "invalid" and stop processing
fmt.Printf("Conversion error: %v\n", err)
}
Complete Example: Web Scraper
package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
"time"
"github.com/yourusername/gotemplate/pkg/worker"
)
type Page struct {
URL string
Title string
Content string
}
func scrape(ctx context.Context, url string) (Page, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return Page{}, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return Page{}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return Page{}, err
}
// Extract title (simplified)
title := extractTitle(string(body))
return Page{
URL: url,
Title: title,
Content: string(body),
}, nil
}
func main() {
urls := []string{
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
// Scrape all pages concurrently with max 3 concurrent requests
pages, err := worker.Map(ctx, urls, 3, scrape)
if err != nil {
log.Fatalf("Scraping failed: %v", err)
}
// Process results
for _, page := range pages {
fmt.Printf("Scraped: %s - %s\n", page.URL, page.Title)
}
}
func extractTitle(html string) string {
// Simplified title extraction
return "Page Title"
}
Error Handling
First-Error Semantics
Both Run() and Map() stop all work when any function returns an error:
err := worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
if item == "bad" {
return fmt.Errorf("encountered bad item") // Stops all work
}
return process(item)
})
Context Cancellation
When the context is cancelled, all goroutines receive the cancellation signal:
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(5 * time.Second)
cancel() // Cancels all workers
}()
err := worker.Run(ctx, items, 10, func(ctx context.Context, item string) error {
select {
case <-ctx.Done():
return ctx.Err() // Goroutine respects cancellation
default:
return process(item)
}
})
Error Wrapping
Add context to errors for better debugging:
err := worker.Run(ctx, files, 5, func(ctx context.Context, file string) error {
if err := process(file); err != nil {
return fmt.Errorf("process %s: %w", file, err)
}
return nil
})
Best Practices
Choosing Concurrency Level
CPU-bound work:
import "runtime"
// Use number of CPU cores
concurrency := runtime.NumCPU()
worker.Run(ctx, items, concurrency, cpuIntensiveFunc)
I/O-bound work:
// Use higher concurrency for I/O operations
concurrency := 10 // or 20, 50, 100+ depending on the service
worker.Run(ctx, items, concurrency, networkFunc)
Rate-limited APIs:
// Respect rate limits
concurrency := 5 // Stay within API rate limits
worker.Run(ctx, items, concurrency, apiFunc)
Always Use Context Timeouts
// Good: Prevents hanging indefinitely
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
worker.Run(ctx, items, 10, fn)
// Bad: Could hang forever
worker.Run(context.Background(), items, 10, fn)
Handle Partial Results
When using Map(), remember that on error you get no results:
results, err := worker.Map(ctx, items, 10, fn)
if err != nil {
// results is nil, no partial results available
log.Printf("Failed: %v", err)
return
}
// All results succeeded
Respect Context in Worker Functions
// Good: Checks context regularly
worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
if err := ctx.Err(); err != nil {
return err
}
return process(ctx, item)
})
// Bad: Ignores context
worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
return process(item) // Doesn't pass context through
})
Use for Appropriate Data Sizes
// Good: Worth parallelizing
items := make([]int, 10000)
worker.Run(ctx, items, 10, expensiveFunc)
// Bad: Overhead may exceed benefit
items := []int{1, 2, 3}
worker.Run(ctx, items, 10, cheapFunc) // Just use a loop instead
Goroutine Overhead
- Each goroutine has ~2KB stack overhead
- Context switching has CPU cost
- Use bounded concurrency to prevent resource exhaustion
Memory Usage
For Map(), the entire result slice is allocated upfront:
// Allocates space for 1 million results immediately
items := make([]int, 1_000_000)
results, _ := worker.Map(ctx, items, 100, fn) // Memory: len(items) * sizeof(R)
Order Preservation
Map() preserves order but may complete faster items must wait:
items := []int{1, 2, 3} // Item 2 takes 10s, others take 1s
results, _ := worker.Map(ctx, items, 3, fn)
// Total time: ~10s (waits for slowest item)
// Results are still [result1, result2, result3]
Comparison: Run vs Map
| Feature | Run | Map |
|---|
| Returns results | No | Yes |
| Memory overhead | Minimal | O(n) for results |
| Order preservation | N/A | Yes |
| Use case | Side effects | Transformations |
| Example | Write files, send emails | Fetch data, parse JSON |
Integration with Other Packages
With Retry Package
import (
"github.com/yourusername/gotemplate/pkg/retry"
"github.com/yourusername/gotemplate/pkg/worker"
)
err := worker.Run(ctx, urls, 5, func(ctx context.Context, url string) error {
return retry.Do(ctx, func(ctx context.Context) error {
return fetchURL(ctx, url)
}, retry.WithMaxAttempts(3))
})
With Database Connection Pool
err := worker.Run(ctx, records, 20, func(ctx context.Context, record Record) error {
return db.ExecContext(ctx, "INSERT INTO ...", record.Values())
})
// Database pool handles actual concurrency limits