Skip to main content
The worker package provides generic, type-safe primitives for processing slices of items in parallel with bounded concurrency and context cancellation support.

Installation

import "github.com/aarock1234/go-template/pkg/worker"

Quick Start

Process items in parallel:
package main

import (
    "context"
    "fmt"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

func main() {
    ctx := context.Background()
    items := []int{1, 2, 3, 4, 5}
    
    err := worker.Run(ctx, items, 3, func(ctx context.Context, item int) error {
        fmt.Printf("Processing: %d\n", item)
        return doWork(item)
    })
    
    if err != nil {
        fmt.Println("Error:", err)
    }
}

Core Functions

Run

Processes items concurrently with bounded parallelism.
func Run[T any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) error) error
ctx
context.Context
Context for cancellation control. When cancelled, all workers stop.
items
[]T
Slice of items to process. Can be any type.
concurrency
int
Maximum number of concurrent goroutines. Use runtime.NumCPU() for CPU-bound work.
fn
func(ctx context.Context, item T) error
Function to execute for each item. Return error to stop processing.
Behavior:
  • Processes items in parallel with at most concurrency goroutines
  • Returns the first error encountered
  • Cancels remaining work on first error
  • Blocks until all work is complete or an error occurs
Example:
urls := []string{"https://api.example.com/1", "https://api.example.com/2"}

err := worker.Run(ctx, urls, 5, func(ctx context.Context, url string) error {
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    // Process response...
    return nil
})

Map

Processes items concurrently and collects results in order.
func Map[T, R any](ctx context.Context, items []T, concurrency int, fn func(ctx context.Context, item T) (R, error)) ([]R, error)
ctx
context.Context
Context for cancellation control
items
[]T
Input items to process
concurrency
int
Maximum number of concurrent goroutines
fn
func(ctx context.Context, item T) (R, error)
Function that transforms each item. Returns result or error.
Behavior:
  • Processes items in parallel with bounded concurrency
  • Returns results in the same order as input items
  • On error, returns nil and the first error encountered
  • Cancels remaining work on first error
Example:
userIDs := []int{1, 2, 3, 4, 5}

users, err := worker.Map(ctx, userIDs, 3, func(ctx context.Context, id int) (User, error) {
    return fetchUser(ctx, id)
})

if err != nil {
    return err
}

// users[0] corresponds to userIDs[0], etc.
for _, user := range users {
    fmt.Println(user.Name)
}

Usage Examples

HTTP Request Processing

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

func downloadFiles(ctx context.Context, urls []string) error {
    return worker.Run(ctx, urls, 10, func(ctx context.Context, url string) error {
        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            return err
        }
        
        resp, err := http.DefaultClient.Do(req)
        if err != nil {
            return err
        }
        defer resp.Body.Close()
        
        data, err := io.ReadAll(resp.Body)
        if err != nil {
            return err
        }
        
        fmt.Printf("Downloaded %d bytes from %s\n", len(data), url)
        return nil
    })
}

Database Batch Processing

import (
    "context"
    "database/sql"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

type Record struct {
    ID   int
    Data string
}

func processRecords(ctx context.Context, db *sql.DB, records []Record) error {
    return worker.Run(ctx, records, 5, func(ctx context.Context, record Record) error {
        _, err := db.ExecContext(ctx,
            "INSERT INTO processed (id, data) VALUES ($1, $2)",
            record.ID, record.Data,
        )
        return err
    })
}

Transform with Map

import (
    "context"
    "encoding/json"
    "net/http"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func fetchUsers(ctx context.Context, ids []int) ([]User, error) {
    return worker.Map(ctx, ids, 5, func(ctx context.Context, id int) (User, error) {
        url := fmt.Sprintf("https://api.example.com/users/%d", id)
        
        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            return User{}, err
        }
        
        resp, err := http.DefaultClient.Do(req)
        if err != nil {
            return User{}, err
        }
        defer resp.Body.Close()
        
        var user User
        if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
            return User{}, err
        }
        
        return user, nil
    })
}

File Processing

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "fmt"
    "os"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

type FileHash struct {
    Path string
    Hash string
}

func hashFiles(ctx context.Context, paths []string) ([]FileHash, error) {
    return worker.Map(ctx, paths, 4, func(ctx context.Context, path string) (FileHash, error) {
        data, err := os.ReadFile(path)
        if err != nil {
            return FileHash{}, fmt.Errorf("reading %s: %w", path, err)
        }
        
        hash := sha256.Sum256(data)
        return FileHash{
            Path: path,
            Hash: hex.EncodeToString(hash[:]),
        }, nil
    })
}

With Context Cancellation

import (
    "context"
    "fmt"
    "time"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

func processWithTimeout() error {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    items := make([]int, 1000)
    for i := range items {
        items[i] = i
    }
    
    err := worker.Run(ctx, items, 10, func(ctx context.Context, item int) error {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // Your processing logic
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Processed: %d\n", item)
            return nil
        }
    })
    
    if err == context.DeadlineExceeded {
        fmt.Println("Processing timed out")
    }
    
    return err
}

Error Handling

import (
    "context"
    "errors"
    "fmt"
    "log"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

var ErrInvalidItem = errors.New("invalid item")

func processWithErrorHandling(ctx context.Context, items []string) error {
    err := worker.Run(ctx, items, 5, func(ctx context.Context, item string) error {
        if item == "" {
            // This error will stop all processing
            return fmt.Errorf("%w: empty string", ErrInvalidItem)
        }
        
        log.Printf("Processing: %s", item)
        return doSomething(item)
    })
    
    if err != nil {
        if errors.Is(err, ErrInvalidItem) {
            return fmt.Errorf("validation failed: %w", err)
        }
        return fmt.Errorf("processing failed: %w", err)
    }
    
    return nil
}

Choosing Concurrency Level

CPU-Bound Work

For operations that primarily use CPU:
import "runtime"

concurrency := runtime.NumCPU()
err := worker.Run(ctx, items, concurrency, cpuBoundWork)

I/O-Bound Work

For network requests, database queries, or file I/O:
// Higher concurrency for I/O operations
concurrency := 50
err := worker.Run(ctx, items, concurrency, ioBoundWork)

Rate-Limited APIs

When calling rate-limited APIs:
// Lower concurrency to respect rate limits
concurrency := 5
err := worker.Run(ctx, items, concurrency, apiCall)

Best Practices

Use Run for side effects: When you only care about processing items without collecting results, use Run. It’s more memory efficient.
Use Map for transformations: When you need to collect results in order, use Map. Results are guaranteed to match input order.
First error stops all work: Both Run and Map stop processing on the first error. Design your error handling accordingly.
Choose appropriate concurrency: Start with runtime.NumCPU() for CPU-bound work, or 10-50 for I/O-bound work. Tune based on your specific use case.
Pass context to operations: Always pass the context parameter to operations like HTTP requests and database queries to ensure they can be cancelled.

Performance Characteristics

Memory:
  • Run: O(concurrency) goroutines
  • Map: O(concurrency) goroutines + O(n) result storage
Ordering:
  • Run: No ordering guarantees
  • Map: Results ordered same as input
Error Handling:
  • Both functions stop on first error
  • Remaining work is cancelled via context

Complete Example

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/aarock1234/go-template/pkg/worker"
)

type Result struct {
    URL    string
    Status int
    Time   time.Duration
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
    defer cancel()
    
    urls := []string{
        "https://api.example.com/endpoint1",
        "https://api.example.com/endpoint2",
        "https://api.example.com/endpoint3",
        "https://api.example.com/endpoint4",
        "https://api.example.com/endpoint5",
    }
    
    results, err := worker.Map(ctx, urls, 3, func(ctx context.Context, url string) (Result, error) {
        start := time.Now()
        
        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            return Result{}, fmt.Errorf("creating request: %w", err)
        }
        
        resp, err := http.DefaultClient.Do(req)
        if err != nil {
            return Result{}, fmt.Errorf("executing request: %w", err)
        }
        defer resp.Body.Close()
        
        return Result{
            URL:    url,
            Status: resp.StatusCode,
            Time:   time.Since(start),
        }, nil
    })
    
    if err != nil {
        log.Fatalf("Processing failed: %v", err)
    }
    
    // Results are in same order as input URLs
    for i, result := range results {
        fmt.Printf("URL %d: %s - Status: %d - Time: %v\n",
            i+1, result.URL, result.Status, result.Time)
    }
}

Build docs developers (and LLMs) love