Skip to main content
In this lesson, we will discuss some advanced concurrency patterns in Go. Often, these patterns are used in combination in real-world applications.

Generator Pattern

Generator Pattern The generator pattern is used to generate a sequence of values which is used to produce some output. In our example, we have a generator function that simply returns a channel from which we can read the values. This works on the fact that sends and receives block until both the sender and receiver are ready. This property allowed us to wait until the next value is requested.
package main

import "fmt"

func main() {
	ch := generator()

	for i := 0; i < 5; i++ {
		value := <-ch
		fmt.Println("Value:", value)
	}
}

func generator() <-chan int {
	ch := make(chan int)

	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()

	return ch
}
If we run this, we’ll notice that we can consume values that were produced on demand:
$ go run main.go
Value: 0
Value: 1
Value: 2
Value: 3
Value: 4
This is similar behavior to yield in JavaScript and Python.

Fan-In Pattern

Fan-In Pattern The fan-in pattern combines multiple inputs into one single output channel. Basically, we multiplex our inputs. In our example, we create the inputs i1 and i2 using the generateWork function. Then we use our variadic function fanIn to combine values from these inputs to a single output channel from which we can consume values.
package main

import (
	"fmt"
	"sync"
)

func main() {
	i1 := generateWork([]int{0, 2, 6, 8})
	i2 := generateWork([]int{1, 3, 5, 7})

	out := fanIn(i1, i2)

	for value := range out {
		fmt.Println("Value:", value)
	}
}

func fanIn(inputs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	wg.Add(len(inputs))

	for _, in := range inputs {
		go func(ch <-chan int) {
			for {
				value, ok := <-ch

				if !ok {
					wg.Done()
					break
				}

				out <- value
			}
		}(in)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func generateWork(work []int) <-chan int {
	ch := make(chan int)

	go func() {
		defer close(ch)

		for _, w := range work {
			ch <- w
		}
	}()

	return ch
}
$ go run main.go
Value: 0
Value: 1
Value: 2
Value: 6
Value: 8
Value: 3
Value: 5
Value: 7
Note that the order of input will not be guaranteed with the fan-in pattern.

Fan-Out Pattern

Fan-Out Pattern Fan-out patterns allow us to essentially split our single input channel into multiple output channels. This is a useful pattern to distribute work items into multiple uniform actors. In our example, we break the input channel into 4 different output channels. For a dynamic number of outputs, we can merge outputs into a shared “aggregate” channel and use select.
package main

import "fmt"

func main() {
	work := []int{1, 2, 3, 4, 5, 6, 7, 8}
	in := generateWork(work)

	out1 := fanOut(in)
	out2 := fanOut(in)
	out3 := fanOut(in)
	out4 := fanOut(in)

	for range work {
		select {
		case value := <-out1:
			fmt.Println("Output 1 got:", value)
		case value := <-out2:
			fmt.Println("Output 2 got:", value)
		case value := <-out3:
			fmt.Println("Output 3 got:", value)
		case value := <-out4:
			fmt.Println("Output 4 got:", value)
		}
	}
}

func fanOut(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)

		for data := range in {
			out <- data
		}
	}()

	return out
}

func generateWork(work []int) <-chan int {
	ch := make(chan int)

	go func() {
		defer close(ch)

		for _, w := range work {
			ch <- w
		}
	}()

	return ch
}
As we can see, our work has been split between multiple goroutines:
$ go run main.go
Output 1 got: 1
Output 2 got: 3
Output 4 got: 4
Output 1 got: 5
Output 3 got: 2
Output 3 got: 6
Output 3 got: 7
Output 1 got: 8
The fan-out pattern is different from pub/sub patterns.

Pipeline Pattern

Pipeline Pattern The pipeline pattern is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines:
  • Receive values from upstream via inbound channels
  • Perform some function on that data, usually producing new values
  • Send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage is the sink or consumer.

Benefits

By using a pipeline, we separate the concerns of each stage, which provides numerous benefits such as:
  • Modify stages independent of one another
  • Mix and match how stages are combined independently of modifying the stage

Example

In our example, we have defined three stages: filter, square, and half:
package main

import (
	"fmt"
	"math"
)

func main() {
	in := generateWork([]int{0, 1, 2, 3, 4, 5, 6, 7, 8})

	out := filter(in) // Filter odd numbers
	out = square(out) // Square the input
	out = half(out)   // Half the input

	for value := range out {
		fmt.Println(value)
	}
}

func filter(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)

		for i := range in {
			if i%2 == 0 {
				out <- i
			}
		}
	}()

	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)

		for i := range in {
			value := math.Pow(float64(i), 2)
			out <- int(value)
		}
	}()

	return out
}

func half(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)

		for i := range in {
			value := i / 2
			out <- value
		}
	}()

	return out
}

func generateWork(work []int) <-chan int {
	ch := make(chan int)

	go func() {
		defer close(ch)

		for _, w := range work {
			ch <- w
		}
	}()

	return ch
}
Seems like our input was processed correctly by the pipeline in a concurrent manner:
$ go run main.go
0
2
8
18
32
Pipelines are a powerful pattern for building complex data processing systems with clean separation of concerns.

Worker Pool Pattern

Worker Pool Pattern The worker pool is a really powerful pattern that lets us distribute work across multiple workers (goroutines) concurrently. In our example, we have a jobs channel to which we will send our jobs and a results channel where our workers will send the results once they’ve finished doing the work. After that, we can launch our workers concurrently and simply receive the results from the results channel.
package main

import (
	"fmt"
	"sync"
)

const totalJobs = 4
const totalWorkers = 2

func main() {
	jobs := make(chan int, totalJobs)
	results := make(chan int, totalJobs)

	for w := 1; w <= totalWorkers; w++ {
		go worker(w, jobs, results)
	}

	// Send jobs
	for j := 1; j <= totalJobs; j++ {
		jobs <- j
	}

	close(jobs)

	// Receive results
	for a := 1; a <= totalJobs; a++ {
		<-results
	}

	close(results)
}

func worker(id int, jobs <-chan int, results chan<- int) {
	var wg sync.WaitGroup

	for j := range jobs {
		wg.Add(1)

		go func(job int) {
			defer wg.Done()

			fmt.Printf("Worker %d started job %d\n", id, job)

			// Do work and send result
			result := job * 2
			results <- result

			fmt.Printf("Worker %d finished job %d\n", id, job)
		}(j)
	}

	wg.Wait()
}
As expected, our jobs were distributed among our workers:
$ go run main.go
Worker 2 started job 4
Worker 2 started job 1
Worker 1 started job 3
Worker 2 started job 2
Worker 2 finished job 1
Worker 1 finished job 3
Worker 2 finished job 2
Worker 2 finished job 4
Ideally, totalWorkers should be set to runtime.NumCPU() which gives us the number of logical CPUs usable by the current process.

Queuing Pattern

Queuing Pattern The queuing pattern allows us to process n number of items at a time. In our example, we use a buffered channel to simulate queue behavior. We simply send an empty struct to our queue channel and wait for it to be released by the previous process so that we can continue. This is because sends to a buffered channel block only when the buffer is full and receives block when the buffer is empty. Here, we have total work of 10 items and we have a limit of 2. This means we can process 2 items at a time.
package main

import (
	"fmt"
	"sync"
	"time"
)

const limit = 2
const work = 10

func main() {
	var wg sync.WaitGroup

	fmt.Println("Queue limit:", limit)
	queue := make(chan struct{}, limit)

	wg.Add(work)

	for w := 1; w <= work; w++ {
		process(w, queue, &wg)
	}

	wg.Wait()

	close(queue)
	fmt.Println("Work complete")
}

func process(work int, queue chan struct{}, wg *sync.WaitGroup) {
	queue <- struct{}{}

	go func() {
		defer wg.Done()

		time.Sleep(1 * time.Second)
		fmt.Println("Processed:", work)

		<-queue
	}()
}
If we run this, we will notice that it briefly pauses when every 2nd item (which is our limit) is processed as our queue waits to be dequeued:
$ go run main.go
Queue limit: 2
Processed: 1
Processed: 2
Processed: 4
Processed: 3
Processed: 5
Processed: 6
Processed: 8
Processed: 7
Processed: 9
Processed: 10
Work complete
Notice how our queue channel is of type struct{} as an empty struct occupies zero bytes of storage.

Error Handling in Concurrent Code

When working with concurrent patterns, proper error handling becomes crucial. Here are some common approaches:

Using Error Channels

One approach is to use a dedicated error channel:
func worker(id int, jobs <-chan int, results chan<- int, errors chan<- error) {
	for j := range jobs {
		result, err := doWork(j)
		if err != nil {
			errors <- fmt.Errorf("worker %d: %w", id, err)
			continue
		}
		results <- result
	}
}

Using Result Structs

Another approach is to combine results and errors in a struct:
type Result struct {
	Value int
	Err   error
}

func worker(id int, jobs <-chan int, results chan<- Result) {
	for j := range jobs {
		value, err := doWork(j)
		results <- Result{Value: value, Err: err}
	}
}
Always handle errors properly in concurrent code. Ignoring errors in goroutines can lead to silent failures that are difficult to debug.

Additional Patterns

Some additional patterns that might be useful to know:
  • Tee channel - Splits one input into two identical outputs
  • Bridge channel - Consumes a channel of channels and produces a single channel
  • Ring buffer channel - Circular buffer implementation using channels
  • Bounded parallelism - Limits the number of concurrent operations
These patterns can be combined to build sophisticated concurrent systems. Start simple and add complexity only when needed.

Build docs developers (and LLMs) love