The worker package provides generic, type-safe primitives for processing slices of items in parallel with bounded concurrency and context cancellation support.
Installation
import "github.com/aarock1234/go-template/pkg/worker"
Quick Start
Process items in parallel:
package main
import (
"context"
"fmt"
"github.com/aarock1234/go-template/pkg/worker"
)
func main() {
ctx := context.Background()
items := []int{1, 2, 3, 4, 5}
err := worker.Run(ctx, items, 3, func(ctx context.Context, item int) error {
fmt.Printf("Processing: %d\n", item)
return doWork(item)
})
if err != nil {
fmt.Println("Error:", err)
}
}
Core Functions
Run
Processes items concurrently with bounded parallelism.
func Run[T any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) error) error
Context for cancellation control. When cancelled, all workers stop.
Slice of items to process. Can be any type.
Maximum number of concurrent goroutines. Use runtime.NumCPU() for CPU-bound work.
fn
func(ctx context.Context, item T) error
Function to execute for each item. Return error to stop processing.
Behavior:
- Processes items in parallel with at most
concurrency goroutines
- Returns the first error encountered
- Cancels remaining work on first error
- Blocks until all work is complete or an error occurs
Example:
urls := []string{"https://api.example.com/1", "https://api.example.com/2"}
err := worker.Run(ctx, urls, 5, func(ctx context.Context, url string) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// Process response...
return nil
})
Map
Processes items concurrently and collects results in order.
func Map[T, R any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) (R, error)) ([]R, error)
Context for cancellation control
Maximum number of concurrent goroutines
fn
func(ctx context.Context, item T) (R, error)
Function that transforms each item. Returns result or error.
Behavior:
- Processes items in parallel with bounded concurrency
- Returns results in the same order as input items
- On error, returns nil and the first error encountered
- Cancels remaining work on first error
Example:
userIDs := []int{1, 2, 3, 4, 5}
users, err := worker.Map(ctx, userIDs, 3, func(ctx context.Context, id int) (User, error) {
return fetchUser(ctx, id)
})
if err != nil {
return err
}
// users[0] corresponds to userIDs[0], etc.
for _, user := range users {
fmt.Println(user.Name)
}
Usage Examples
HTTP Request Processing
package main
import (
"context"
"fmt"
"io"
"net/http"
"github.com/aarock1234/go-template/pkg/worker"
)
func downloadFiles(ctx context.Context, urls []string) error {
return worker.Run(ctx, urls, 10, func(ctx context.Context, url string) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Printf("Downloaded %d bytes from %s\n", len(data), url)
return nil
})
}
Database Batch Processing
import (
"context"
"database/sql"
"github.com/aarock1234/go-template/pkg/worker"
)
type Record struct {
ID int
Data string
}
func processRecords(ctx context.Context, db *sql.DB, records []Record) error {
return worker.Run(ctx, records, 5, func(ctx context.Context, record Record) error {
_, err := db.ExecContext(ctx,
"INSERT INTO processed (id, data) VALUES ($1, $2)",
record.ID, record.Data,
)
return err
})
}
import (
"context"
"encoding/json"
"net/http"
"github.com/aarock1234/go-template/pkg/worker"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
func fetchUsers(ctx context.Context, ids []int) ([]User, error) {
return worker.Map(ctx, ids, 5, func(ctx context.Context, id int) (User, error) {
url := fmt.Sprintf("https://api.example.com/users/%d", id)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return User{}, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return User{}, err
}
defer resp.Body.Close()
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return User{}, err
}
return user, nil
})
}
File Processing
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"github.com/aarock1234/go-template/pkg/worker"
)
type FileHash struct {
Path string
Hash string
}
func hashFiles(ctx context.Context, paths []string) ([]FileHash, error) {
return worker.Map(ctx, paths, 4, func(ctx context.Context, path string) (FileHash, error) {
data, err := os.ReadFile(path)
if err != nil {
return FileHash{}, fmt.Errorf("reading %s: %w", path, err)
}
hash := sha256.Sum256(data)
return FileHash{
Path: path,
Hash: hex.EncodeToString(hash[:]),
}, nil
})
}
With Context Cancellation
import (
"context"
"fmt"
"time"
"github.com/aarock1234/go-template/pkg/worker"
)
func processWithTimeout() error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
items := make([]int, 1000)
for i := range items {
items[i] = i
}
err := worker.Run(ctx, items, 10, func(ctx context.Context, item int) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Your processing logic
time.Sleep(100 * time.Millisecond)
fmt.Printf("Processed: %d\n", item)
return nil
}
})
if err == context.DeadlineExceeded {
fmt.Println("Processing timed out")
}
return err
}
Error Handling
import (
"context"
"errors"
"fmt"
"log"
"github.com/aarock1234/go-template/pkg/worker"
)
var ErrInvalidItem = errors.New("invalid item")
func processWithErrorHandling(ctx context.Context, items []string) error {
err := worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
if item == "" {
// This error will stop all processing
return fmt.Errorf("%w: empty string", ErrInvalidItem)
}
log.Printf("Processing: %s", item)
return doSomething(item)
})
if err != nil {
if errors.Is(err, ErrInvalidItem) {
return fmt.Errorf("validation failed: %w", err)
}
return fmt.Errorf("processing failed: %w", err)
}
return nil
}
Choosing Concurrency Level
CPU-Bound Work
For operations that primarily use CPU:
import "runtime"
concurrency := runtime.NumCPU()
err := worker.Run(ctx, items, concurrency, cpuBoundWork)
I/O-Bound Work
For network requests, database queries, or file I/O:
// Higher concurrency for I/O operations
concurrency := 50
err := worker.Run(ctx, items, concurrency, ioBoundWork)
Rate-Limited APIs
When calling rate-limited APIs:
// Lower concurrency to respect rate limits
concurrency := 5
err := worker.Run(ctx, items, concurrency, apiCall)
Best Practices
Use Run for side effects: When you only care about processing items without collecting results, use Run. It’s more memory efficient.
Use Map for transformations: When you need to collect results in order, use Map. Results are guaranteed to match input order.
First error stops all work: Both Run and Map stop processing on the first error. Design your error handling accordingly.
Choose appropriate concurrency: Start with runtime.NumCPU() for CPU-bound work, or 10-50 for I/O-bound work. Tune based on your specific use case.
Pass context to operations: Always pass the context parameter to operations like HTTP requests and database queries to ensure they can be cancelled.
Memory:
Run: O(concurrency) goroutines
Map: O(concurrency) goroutines + O(n) result storage
Ordering:
Run: No ordering guarantees
Map: Results ordered same as input
Error Handling:
- Both functions stop on first error
- Remaining work is cancelled via context
Complete Example
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/aarock1234/go-template/pkg/worker"
)
type Result struct {
URL string
Status int
Time time.Duration
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
urls := []string{
"https://api.example.com/endpoint1",
"https://api.example.com/endpoint2",
"https://api.example.com/endpoint3",
"https://api.example.com/endpoint4",
"https://api.example.com/endpoint5",
}
results, err := worker.Map(ctx, urls, 3, func(ctx context.Context, url string) (Result, error) {
start := time.Now()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return Result{}, fmt.Errorf("creating request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return Result{}, fmt.Errorf("executing request: %w", err)
}
defer resp.Body.Close()
return Result{
URL: url,
Status: resp.StatusCode,
Time: time.Since(start),
}, nil
})
if err != nil {
log.Fatalf("Processing failed: %v", err)
}
// Results are in same order as input URLs
for i, result := range results {
fmt.Printf("URL %d: %s - Status: %d - Time: %v\n",
i+1, result.URL, result.Status, result.Time)
}
}