Go Worker Pools
Introduction
Worker pools are a powerful concurrency pattern in Go that allows you to control and limit the number of goroutines running simultaneously. This pattern is essential for efficiently managing resource-intensive tasks, preventing system overload, and optimizing performance in concurrent applications.
In this tutorial, we'll explore how worker pools work, why they're important, and how to implement them in Go. By the end, you'll understand how to efficiently distribute work across multiple goroutines and manage concurrent execution in a controlled manner.
Understanding Worker Pools
A worker pool is a collection of goroutines (workers) that wait for tasks to be assigned to them. Each worker processes tasks independently, allowing for parallel execution. Once a worker completes a task, it becomes available to process the next one.
Here's why worker pools are useful:
- Resource Control: Limit the number of concurrent operations to avoid overwhelming system resources
- Load Distribution: Efficiently distribute tasks across available workers
- Performance Optimization: Process multiple tasks in parallel while maintaining control
- Graceful Shutdown: Coordinate the completion of all tasks before program termination
Key Concept: Worker pools help you control the degree of concurrency in your application, preventing the unbounded creation of goroutines that could lead to resource exhaustion.
Basic Worker Pool Architecture
The basic architecture of a worker pool consists of:
- Tasks Channel: Distributes work to available workers
- Worker Goroutines: Process tasks concurrently
- Results Channel: Collects the results of completed tasks
- Synchronization Mechanism: Coordinates worker startup and shutdown
Implementing a Basic Worker Pool
Let's build a simple worker pool that processes jobs (in this case, calculating the square of numbers):
package main
import (
	"fmt"
	"time"
)
// Job represents a task to be processed
type Job struct {
	ID     int
	Number int
}
// Result represents the outcome of processing a job
type Result struct {
	JobID  int
	Number int
	Square int
}
func main() {
	// Create channels for jobs and results
	jobs := make(chan Job, 100)
	results := make(chan Result, 100)
	// Start 3 worker goroutines
	numWorkers := 3
	for w := 1; w <= numWorkers; w++ {
		go worker(w, jobs, results)
	}
	// Send 5 jobs
	numJobs := 5
	for j := 1; j <= numJobs; j++ {
		jobs <- Job{ID: j, Number: j}
	}
	close(jobs)
	// Collect all results
	for a := 1; a <= numJobs; a++ {
		result := <-results
		fmt.Printf("Result: Job ID %d, Number %d, Square %d
",
			result.JobID, result.Number, result.Square)
	}
}
// worker processes jobs from the jobs channel and sends results
// to the results channel
func worker(id int, jobs <-chan Job, results chan<- Result) {
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d
", id, job.ID)
		time.Sleep(time.Second) // Simulate work
		results <- Result{
			JobID:  job.ID,
			Number: job.Number,
			Square: job.Number * job.Number,
		}
	}
}
Sample Output:
Worker 1 processing job 1
Worker 3 processing job 3
Worker 2 processing job 2
Worker 1 processing job 4
Worker 3 processing job 5
Result: Job ID 1, Number 1, Square 1
Result: Job ID 3, Number 3, Square 9
Result: Job ID 2, Number 2, Square 4
Result: Job ID 4, Number 4, Square 16
Result: Job ID 5, Number 5, Square 25
Code Explanation
- We define JobandResulttypes to represent our tasks and their outcomes.
- We create buffered channels for jobs and results.
- We start a fixed number of worker goroutines.
- Each worker processes jobs from the jobs channel and sends the results to the results channel.
- The main goroutine sends jobs and collects results.
Using WaitGroups for Coordination
The previous example works, but it assumes we know exactly how many results to expect. Let's improve our worker pool by using sync.WaitGroup to properly coordinate the completion of all tasks:
package main
import (
	"fmt"
	"sync"
	"time"
)
// Job represents a task to be processed
type Job struct {
	ID     int
	Number int
}
// Result represents the outcome of processing a job
type Result struct {
	JobID  int
	Number int
	Square int
}
func main() {
	// Create channels for jobs and results
	jobs := make(chan Job, 100)
	results := make(chan Result, 100)
	// WaitGroup to wait for all workers to finish
	var wg sync.WaitGroup
	// Start 3 worker goroutines
	numWorkers := 3
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, jobs, results, &wg)
	}
	// Send 5 jobs
	numJobs := 5
	for j := 1; j <= numJobs; j++ {
		jobs <- Job{ID: j, Number: j}
	}
	close(jobs)
	// Start a goroutine to close the results channel after all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()
	// Collect all results
	for result := range results {
		fmt.Printf("Result: Job ID %d, Number %d, Square %d
",
			result.JobID, result.Number, result.Square)
	}
}
// worker processes jobs from the jobs channel and sends results
// to the results channel
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d
", id, job.ID)
		time.Sleep(time.Second) // Simulate work
		results <- Result{
			JobID:  job.ID,
			Number: job.Number,
			Square: job.Number * job.Number,
		}
	}
}
Key Improvements:
- We use a sync.WaitGroupto track when all workers have finished.
- We close the results channel only after all workers are done, allowing us to range over the results.
- This pattern ensures we don't miss any results and properly coordinate shutdown.
Real-World Example: Image Processing
Let's look at a more practical example: using a worker pool to process multiple images concurrently. We'll simulate image processing by applying a "filter" to each image:
package main
import (
	"fmt"
	"sync"
	"time"
)
// ImageTask represents an image to be processed
type ImageTask struct {
	ID   int
	Name string
	Size int // Size in KB
}
// ProcessedImage represents a processed image
type ProcessedImage struct {
	TaskID       int
	OriginalName string
	NewName      string
	ProcessTime  time.Duration
}
// Simulates applying a filter to an image
func applyFilter(img ImageTask) ProcessedImage {
	// Processing time proportional to image size
	processTime := time.Duration(img.Size/10) * time.Millisecond
	time.Sleep(processTime)
	
	return ProcessedImage{
		TaskID:       img.ID,
		OriginalName: img.Name,
		NewName:      fmt.Sprintf("%s_filtered.jpg", img.Name[:len(img.Name)-4]),
		ProcessTime:  processTime,
	}
}
func worker(id int, images <-chan ImageTask, results chan<- ProcessedImage, wg *sync.WaitGroup) {
	defer wg.Done()
	for img := range images {
		fmt.Printf("Worker %d processing image: %s
", id, img.Name)
		processed := applyFilter(img)
		results <- processed
	}
}
func main() {
	// Create sample images
	sampleImages := []ImageTask{
		{ID: 1, Name: "vacation.jpg", Size: 2500},
		{ID: 2, Name: "family.jpg", Size: 3200},
		{ID: 3, Name: "birthday.jpg", Size: 1800},
		{ID: 4, Name: "wedding.jpg", Size: 4500},
		{ID: 5, Name: "graduation.jpg", Size: 2800},
		{ID: 6, Name: "party.jpg", Size: 1500},
		{ID: 7, Name: "trip.jpg", Size: 3600},
		{ID: 8, Name: "concert.jpg", Size: 2200},
	}
	// Create channels
	imageTasks := make(chan ImageTask, len(sampleImages))
	processedImages := make(chan ProcessedImage, len(sampleImages))
	// Start worker pool with 3 workers
	var wg sync.WaitGroup
	numWorkers := 3
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, imageTasks, processedImages, &wg)
	}
	// Send all images to be processed
	startTime := time.Now()
	for _, img := range sampleImages {
		imageTasks <- img
	}
	close(imageTasks)
	// Wait for all workers to finish and close results channel
	go func() {
		wg.Wait()
		close(processedImages)
	}()
	// Collect and report results
	totalProcessed := 0
	for processed := range processedImages {
		totalProcessed++
		fmt.Printf("Processed: %s → %s (took %v)
", 
			processed.OriginalName, 
			processed.NewName, 
			processed.ProcessTime)
	}
	totalTime := time.Since(startTime)
	fmt.Printf("
Summary: Processed %d images in %v using %d workers
", 
		totalProcessed, totalTime, numWorkers)
	
	// Calculate theoretical time if done sequentially
	var sequentialTime time.Duration
	for _, img := range sampleImages {
		sequentialTime += time.Duration(img.Size/10) * time.Millisecond
	}
	fmt.Printf("Sequential processing would take approximately: %v
", sequentialTime)
	fmt.Printf("Speedup factor: %.2fx
", float64(sequentialTime)/float64(totalTime))
}
Sample Output:
Worker 1 processing image: vacation.jpg
Worker 2 processing image: family.jpg
Worker 3 processing image: birthday.jpg
Processed: vacation.jpg → vacation_filtered.jpg (took 250ms)
Worker 1 processing image: wedding.jpg
Processed: birthday.jpg → birthday_filtered.jpg (took 180ms)
Worker 3 processing image: graduation.jpg
Processed: family.jpg → family_filtered.jpg (took 320ms)
Worker 2 processing image: party.jpg
Processed: party.jpg → party_filtered.jpg (took 150ms)
Worker 2 processing image: trip.jpg
Processed: graduation.jpg → graduation_filtered.jpg (took 280ms)
Worker 3 processing image: concert.jpg
Processed: wedding.jpg → wedding_filtered.jpg (took 450ms)
Processed: concert.jpg → concert_filtered.jpg (took 220ms)
Processed: trip.jpg → trip_filtered.jpg (took 360ms)
Summary: Processed 8 images in 970ms using 3 workers
Sequential processing would take approximately: 2.21s
Speedup factor: 2.28x
Key Takeaways from the Image Processing Example:
- Parallel Processing: We process multiple images simultaneously, which leads to a significant speedup.
- Resource Utilization: We limit concurrent processing to 3 workers, preventing system overload.
- Task Distribution: The worker pool automatically distributes tasks to available workers.
- Performance Gain: The example shows a clear performance advantage over sequential processing.
Advanced Worker Pool Patterns
Worker Pool with Timeouts
In real-world applications, you might want to implement timeouts to prevent workers from getting stuck on problematic tasks:
package main
import (
	"fmt"
	"sync"
	"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("Worker %d started job %d
", id, j)
		
		// Create a channel for the job result
		done := make(chan bool)
		
		// Process the job in a separate goroutine
		go func() {
			// Simulate work that might take a variable amount of time
			time.Sleep(time.Duration(j*200) * time.Millisecond)
			done <- true
		}()
		
		// Wait for job completion or timeout
		select {
		case <-done:
			// Job completed successfully
			fmt.Printf("Worker %d finished job %d
", id, j)
			results <- j * j
		case <-time.After(1 * time.Second):
			// Job timed out
			fmt.Printf("Worker %d timeout on job %d
", id, j)
			results <- -j // Indicate error with negative result
		}
	}
}
func main() {
	jobs := make(chan int, 10)
	results := make(chan int, 10)
	
	var wg sync.WaitGroup
	numWorkers := 3
	
	// Start workers
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, jobs, results, &wg)
	}
	
	// Send jobs
	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	close(jobs)
	
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	
	// Process results
	for r := range results {
		if r < 0 {
			fmt.Printf("Result: Job %d timed out
", -r)
		} else {
			fmt.Printf("Result: %d
", r)
		}
	}
}
This implementation uses the select statement with a timeout to prevent workers from getting stuck on a single job.
Dynamically Adjusting the Worker Pool
In some cases, you might want to adjust the number of workers based on the system load:
package main
import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)
type WorkerPool struct {
	jobs      chan int
	results   chan int
	wg        sync.WaitGroup
	activeJobs int32
	maxWorkers int
	minWorkers int
}
func NewWorkerPool(minWorkers, maxWorkers int) *WorkerPool {
	return &WorkerPool{
		jobs:       make(chan int, 100),
		results:    make(chan int, 100),
		activeJobs: 0,
		maxWorkers: maxWorkers,
		minWorkers: minWorkers,
	}
}
func (wp *WorkerPool) worker(id int) {
	defer wp.wg.Done()
	
	for j := range wp.jobs {
		atomic.AddInt32(&wp.activeJobs, 1)
		fmt.Printf("Worker %d processing job %d
", id, j)
		
		// Simulate work
		time.Sleep(500 * time.Millisecond)
		wp.results <- j * j
		
		atomic.AddInt32(&wp.activeJobs, -1)
	}
}
func (wp *WorkerPool) Start() {
	// Start minimum number of workers
	for i := 1; i <= wp.minWorkers; i++ {
		wp.wg.Add(1)
		go wp.worker(i)
	}
	
	// Monitor job queue and start additional workers if needed
	go wp.monitor()
}
func (wp *WorkerPool) monitor() {
	workerCount := wp.minWorkers
	
	for {
		time.Sleep(200 * time.Millisecond)
		queueSize := len(wp.jobs)
		activeJobs := atomic.LoadInt32(&wp.activeJobs)
		
		// If queue is filling up, add more workers
		if queueSize > 5 && workerCount < wp.maxWorkers {
			workerCount++
			wp.wg.Add(1)
			fmt.Printf("Starting additional worker #%d
", workerCount)
			go wp.worker(workerCount)
		}
		
		// If all jobs are processed, exit the monitor
		if queueSize == 0 && activeJobs == 0 && workerCount <= wp.minWorkers {
			break
		}
	}
}
func (wp *WorkerPool) Submit(job int) {
	wp.jobs <- job
}
func (wp *WorkerPool) Close() {
	close(wp.jobs)
	wp.wg.Wait()
	close(wp.results)
}
func (wp *WorkerPool) Results() <-chan int {
	return wp.results
}
func main() {
	// Create a pool with min 2, max 5 workers
	pool := NewWorkerPool(2, 5)
	pool.Start()
	
	// Submit jobs with a varying rate
	go func() {
		for i := 1; i <= 20; i++ {
			pool.Submit(i)
			
			// Submit jobs quickly for a burst, then slow down
			if i < 10 {
				time.Sleep(100 * time.Millisecond) // Fast submission
			} else {
				time.Sleep(600 * time.Millisecond) // Slow submission
			}
		}
		pool.Close()
	}()
	
	// Collect results
	for r := range pool.Results() {
		fmt.Printf("Got result: %d
", r)
	}
}
This dynamic worker pool adjusts the number of workers based on the queue size, allowing it to handle bursts of activity more efficiently.
Best Practices for Worker Pools
- Determine Optimal Worker Count: The ideal number of workers depends on your specific workload and system. As a starting point, you can use the number of CPU cores available:
numWorkers := runtime.NumCPU()
- 
Use Buffered Channels: Buffered channels help smooth out uneven processing rates between producers and consumers. 
- 
Graceful Shutdown: Always ensure all workers finish their tasks properly before the program exits. 
- 
Error Handling: Include a mechanism for workers to report errors without crashing the entire pool. 
- 
Monitoring and Metrics: Track worker pool performance to identify bottlenecks. 
- 
Avoid Worker Starvation: Ensure that work is distributed fairly among workers. 
- 
Consider Task Prioritization: Some tasks may be more important than others and should be processed first. 
Common Pitfalls
- 
Creating Too Many Workers: Having too many concurrent goroutines can lead to excessive context switching and memory usage. 
- 
Forgetting to Close Channels: Failing to close channels can lead to goroutine leaks. 
- 
Deadlocks: Be careful with channel operations to avoid situations where workers are waiting for each other. 
- 
Race Conditions: Use proper synchronization mechanisms when accessing shared resources. 
- 
Not Handling Panics: A panic in one worker can bring down the entire application. 
Summary
Worker pools are a fundamental concurrency pattern in Go that help you efficiently distribute work across multiple goroutines while maintaining control over resource usage. By limiting the number of concurrent operations, you can optimize performance and prevent system overload.
In this tutorial, we've covered:
- The concept and benefits of worker pools
- Basic worker pool implementation using goroutines and channels
- Coordinating workers using WaitGroups
- Real-world applications like image processing
- Advanced patterns like timeouts and dynamic worker pools
- Best practices and common pitfalls
Worker pools are just one of many concurrency patterns available in Go. As you become more familiar with Go's concurrency features, you'll discover more sophisticated patterns for handling complex concurrent workflows.
Exercises
- Modify the basic worker pool to handle errors gracefully.
- Implement a worker pool that can prioritize certain tasks.
- Create a worker pool that can be paused and resumed.
- Build a web server that uses a worker pool to handle incoming requests.
- Implement a worker pool that can process different types of tasks.
Additional Resources
- Go Concurrency Patterns: Pipelines and cancellation
- Go Concurrency Patterns: Context
- Concurrency in Go by Katherine Cox-Buday
- Go by Example: Worker Pools
- The Go Programming Language by Alan A. A. Donovan and Brian W. Kernighan
💡 Found a typo or mistake? Click "Edit this page" to suggest a correction. Your feedback is greatly appreciated!