Mastering 6 Golang Concurrency Patterns to Level Up Your Apps

Relia Software

Relia Software

Phuong Le

Relia Software

featured

6 common Golang Concurrency Patterns are: Generator Pattern, Worker Pool Pattern, Pipeline Pattern, Fan-In Pattern, Semaphore Pattern, and Timeout Pattern.

Mastering 6 Golang Concurrency Patterns to Level Up Your Apps

Table of Contents

The way Go handles concurrency is unique compared to other languages, which is one of the reasons it's so good for building programs that do many things concurrently. To really take advantage of Go's ability to handle lots of things at the same time, we need to know how it works and design patterns are a big help here.

In this blog, we're diving into 6 common concurrency design patterns that help us manage our concurrency code more effectively.

>> Read more about Golang coding:

Generator Pattern

The generator pattern is a way to create a function that gives us a series of values. In other programming languages, people might use things called enumerators or iterators. But in Go, the best tool for this job is a channel.

Think of it this way: we have a generator function that gives us something like a list we can pull values from, one at a time.

When we get values from this list, we can use a for range in our code to handle each value as it comes.

func evenGenerator() <-chan int {
	c := make(chan int)

	go func() {
		for i := 0; ; i += 2 {
			c <- i
		}
	}()

	return c
}

In this evenGenerator example, we make a channel c that can only use with integers. The (<-chan int) part means that whoever gets this channel can only get numbers from it, not send their own numbers into it.

Inside the evenGenerator, we start a new part of the program, called a goroutine, with go func() and this part just keeps sending even numbers into the channel c.

To get the values from this channel, the caller can use a loop to listen and print the values. Here's how we do it in the main part of our program:

func main() {
	for v := range evenGenerator() {
		fmt.Println(v)
	}
}

This way of doing things is good because it lets us work on two things at once. We can have values being made in one part of our program while another part does something else.

Here are the steps to make this pattern work:

  1. Make a generator function: This function will give us back a channel and it is like a pipe sending out the values.
  2. Make a channel: Decide if we want this channel to do two-way communication or just one way.
  3. Start a separate goroutine: This will create the values and send them through the channel.
  4. Give back the channel: We return the channel that can only be used to get the values to the person who asked for it.
  5. Use the values: The person who got the channel can use a loop to get and use the values from the channel.

"Why just not loop over the values in the generator function and return a slice?"

The main point here is that, a generator lets us handle the data one piece at a time. It gives us each value only when we ask for it. This is especially useful when we're dealing with really big amounts of data that won't fit in our computer's memory all at once.

Remember, though, don't make things more complicated than they need to be. If we're working with a small group of items that our computer can handle easily, then just putting those items in a list and giving back that list is perfectly fine.

Worker Pool Pattern

In the worker pool pattern, we use multiple workers to handle jobs. Each worker is a separate process, and we have a way of assigning tasks to these workers. In Go, we use goroutines for the workers and channels to give them tasks and to receive the results back.

A worker is just a function that runs in its own goroutine and it's designed to do jobs and usually, a worker will keep running, always ready to pick up a new job from a channel.

When it gets a job, it does the work needed and then gets ready for the next job:

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, j)

		time.Sleep(time.Second)// simulate time-consuming task
		results <- j * j// square the number and send the result back

		fmt.Printf("Worker %d finished job %d\n", id, j)
	}
}

In the example given, we're assuming that the job of squaring a number takes 1 second.

Let's look at what's happening in the worker function:

  • The worker gets jobs from the jobs channel, for each job, it squares the number. The 1 second wait with time.Sleep is there to make it act like it's busy with the task.
  • After it's done with the task, it returns the result back to the results channel.
  • The worker function also has an id, so we know which worker is doing the task and a wait group wg to let us know when it's done all its work.

When the jobs channel is closed, which means nothing more is coming in, the worker will stop getting new jobs and the loop will end.

Let's put this into action with a group of workers, we'll assign them jobs to do and then look at what they've done:

// Setup the queue
numJobs := 10
numWorkers := 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)

// Start the workers.for w := 1; w <= numWorkers; w++ {
	wg.Add(1)
	go worker(w, jobs, results, &wg)
}

// Send jobs to the workers and close the jobs channel when done.for j := 1; j <= numJobs; j++ {
	jobs <- j
}
close(jobs)


// Wait for all the workers to finish.go func() {
	wg.Wait()
	close(results)
}()


// Collect and print the results.for result := range results {
	fmt.Printf("Result: %d\n", result)
}

In this setup, we have 3 workers, each with their own ID: 1, 2, and 3 and there are also 10 tasks they need to work on.

Every worker has their own goroutine and is always ready to take on a job from the jobs channel. Also, we can skip the results channel if we don't need to keep track of them.

"But what if all workers are busy?"

Well, if all workers are already busy, any new task will just have to wait in the jobs channel until one of the workers is free to take it.

In simple terms, the new task just sits there, and nothing happens with it until there's a worker ready to pick it up.

"Why use a worker pool? Why set a limit?"

Having more goroutines doesn't always lead to better performance. If we start a new goroutine every time we get a request, it can actually slow things down because it takes effort to allocate and keep track of all of them.

However, this doesn't mean we always continue with the same number of workers. We can change the number of workers to match the amount of work we have.

Pipeline Pattern

The pipeline pattern is a way of splitting tasks where each step does a particular job. The output of one step becomes the input for the next and this technique is good for tasks that can be divided into smaller, separate jobs.

Let's say you start with the number 1 and end up with the number 9 after all the steps:

# 1 -> 2 -> 3 -> 9
1 | multiply by 2 | add 1 | square = 9

# 4 -> 8 -> 9 -> 81
4 | multiply by 2 | add 1 | square = 81

What's really great about using a pipeline in Go is that, we can work on a bunch of numbers all at the same time.

While one number is being squared at the square step, another number can be getting add 1, and yet another one can be getting multiply by 2 at the first step. So, we have a team where everyone is doing their job at the same time, which makes everything go faster.

To build this pipeline in Go, here's the solution we follow:

  • Each step communicates with the next one through channels and the output from one step is sent directly into the input channel of the next step.
  • At each step, we use goroutines to receive data from the input channel and send it to the output channel.
  • We connect the steps by passing the output channel from one step to the input channel of the next step.

Let's look at just the first step multiply by 2 to get started since the other steps follow a similar pattern, so we can try writing them on our own to get some practice:

// Step 1: Multiply by 2func multiplyByTwo(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * 2// Take n, double it, and send it to the outgoing channel
		}
		close(out)
	}()
	return out
}

We just define a step in our pipeline, it takes an input channel in and returns an output channel. Inside it, we start a goroutine that processes each incoming number, performing the needed action, and then passes it along to the next step.

To connect these steps and run the entire pipeline, we would do something like this:

func main() {
// Start with a set of numbers
	in := make(chan int)

// Set up the pipeline
	c1 := multiplyByTwo(in)
	c2 := addOne(c1)
	c3 := square(c2)

// Send numbers into the pipeline and close the input channel when donego func() {
		for _, n := range []int{1, 4} {
			in <- n
		}
		close(in)
	}()

// Receive the final output from the pipelinefor result := range c3 {
		fmt.Println(result)
	}
}

Here, we're loading our numbers into a channel named in. We then set up our pipeline by linking our functions: the multiplyByTwo function feeds into addOne, and the output of addOne goes into square.

That's how we build our pipeline and when we run this main function, it will display the results after the numbers have passed through all the stages.

Fan-In Pattern

The fan-in pattern is about taking results from several sources and putting them together into one source.

In Go, if we have a bunch of different channels (sources) and we want to bring them all into one channel, this is what we'd use. There are several ways to do it, but here's a typical method:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:  c <- s
            case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

With this setup, we have two channels called input1 and input2. We listen to them and whenever we get something, we send it to the channel c.

But there's a big issue with this function, do you see it before we get into it?

The problem is that the fanIn function doesn't have a way to stop the goroutine after input1 and input2 are done. That means the goroutine will keep running forever, which is not good because it's what we call a goroutine leak.

This is a better way to do it:

func fanIn(input1, input2 <-chan int) <-chan int {
    c := make(chan int)
    go func() {
        defer close(c)// 1for input1 != nil || input2 != nil {// 2select {
            case s, ok := <-input1:
                if !ok {
                    input1 = nil// 3continue
                }
                c <- s
            case s, ok := <-input2:
                if !ok {
                    input2 = nil// 4continue
                }
                c <- s
            }
        }
    }()

    return c
}

Longer, right? And this is why this works better:

  • The loop keeps going as long as either input1 or input2 is still open.
  • If a channel gets closed, we set that channel to nil. A nil channel won't send or receive any more data, and this helps avoid an endless loop.
  • If both channels are closed, the loop will end, and the goroutine will stop.

Okay, let's try this out with a simple generator (remember the first pattern?) to see how it works:

func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func main() {
// Set up the generators, each producing a stream of numbers
	c1 := generator(1, 3, 5)
	c2 := generator(2, 4, 6)

	c := fanIn(c1, c2)

	for n := range c {
		fmt.Println(n)
	}
}

The order of the output might not be predictable and we could see something like 1 2 3 5 4 6 because it depends on how the program runs.

But what if we have more than two channels to combine? Using a select statement gets tricky if we have a lot of channels.

Instead, we could set up each channel with its own goroutine that takes values from that channel and sends them to a shared output channel.

This way, it's easier to handle more than two channels without making our code too complicated:

func fanIn(inps ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}

// Start a goroutine for each input channel.
	wg.Add(len(inps))
	for _, c := range inps {
		go output(c)
	}

// Start a goroutine to close the output channel// once all input inps have been drained.go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

It might look a bit complex at first, but it's actually just a more flexible setup that can handle as many input channels as we need.

Here's what happens:

  • For every channel we have, we start a new process that waits for messages.
  • When a message comes in, it's passed along to the out channel.
  • We use something called a sync.WaitGroup to make sure we wait for all these processes to finish before we close the out channel.

Semaphore Pattern

The semaphore pattern is used to control how many goroutines can access something at the same time. Instead of using a sync.Mutex that allows only one at a time, we can use a semaphore to let a few more in.

In Go, we can use a buffered channel to make a semaphore. The size of the channel tells us how many can go at once, when we send something to the channel, it means we're taking up one of those slots. When we take something out, it means we're done and someone else can go.

Talk is cheap, let's see the code (read the comments to understand what's happening):

func main() {
// Define the number of resources and the number of goroutines.const numTokens = 3
	const numGoroutines = 10

// Create a buffered channel to act as a semaphore, with 3 slots.
	semaphore := make(chan struct{}, numTokens)

	var wg sync.WaitGroup
	wg.Add(numGoroutines)

	for i := 1; i <= numGoroutines; i++ {
		go func(id int) {
			defer wg.Done()

			semaphore <- struct{}{}// Someone gets a token.

			fmt.Printf("Goroutine %d is acquired the token.\n", id)
			time.Sleep(time.Second)// Simulate heavy work

			<-semaphore// Someone releases the token.
		}(i)
	}

	wg.Wait()
}

In this code, we have 10 goroutines that all want to do something, but only 3 of them can do it at the same time. Each one waits its turn, does its work, and then lets the next one go.

The semaphore channel is set up to allow only 3 goroutines at once because its size is 3.

If you don't prefer using an empty struct{}{} or the channel approach, I can write a more specific tool for you:

// Semaphore type that encapsulates the semaphore logic.type Semaphore struct {
	ch chan struct{}
}

// NewSemaphore creates a new Semaphore with a given maximum count.func NewSemaphore(maxCount int) *Semaphore {
	return &Semaphore{
		ch: make(chan struct{}, maxCount),
	}
}

// Acquire waits to obtain a permit from the semaphore.func (s *Semaphore) Acquire() {
	s.ch <- struct{}{}
}

// Release returns a permit to the semaphore.func (s *Semaphore) Release() {
	<-s.ch
}

This code makes a Semaphore type which manages the wait and release process for us, everytime we call Acquire, it will wait until there's a slot available, and Release will free up a slot.

func doSomething() {
	semaphore.Acquire()
	defer semaphore.Release()

// ...do something
}

Now, let's move on to the next topic, which is the Timeout Pattern.

Timeout Pattern

The timeout pattern is useful for making sure that a section of our code doesn't run for too long without stopping.

Even though Go has tools for this in its context package, not everyone uses them when they should, like when they need to close things down smoothly.

Here's an example of what we shouldn't do:

func doJob() {
	for {
// ... doing something

		time.Sleep(time.Minute)// wait for a minute
	}
}

If we write a function like this, it'll keep running and there's no easy way to stop it.

So, if we're trying to shut down our application, this job could be cut off before it's done and that's not what we want, especially if it's in the middle of something important.

The timeout pattern is a way to make sure our code can stop when it needs to. We can use the context package from Go to set a timeout and then we send this context to our function, like this:

func doJob(ctx context.Context) {
	for {
// ... do somethingselect {
		case <-ctx.Done():
			return
		default:
			time.Sleep(time.Minute)// wait for a minute
		}
	}
}

But there's a problem here, even though our job can stop.

The time.Sleep function will make us wait the full minute, even if we're told to stop. To fix this, we use a different solution with a timer that can be stopped and here's how it looks:

func doJob(ctx context.Context) {
	ticker := time.NewTicker(time.Minute)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
//... do something every minute
		}
	}
}

With this setup, we can stop the job whenever we need to, which is much better.

But be careful, if the job takes a long time, the ticker will keep going every minute, it won't wait for the job to finish. So, it could start a new job immediately after the last one.

"I want to use sleep, but I want to interrupt it, what should I do?"

We can use time.NewTimer instead of time.NewTicker to create a sleep that can be interrupted.

It's like time.Sleep, but it gives us the ability to stop it if we get a signal to do so. Here's how we can implement it:

func sleep(ctx context.Context, duration time.Duration) {
	timer := time.NewTimer(duration)
	defer timer.Stop()

	select {
	case <-ctx.Done():
		return
	case <-timer.C:
		return
	}
}

We can use this sleep function in our code whenever we need to sleep. If the context sends a cancellation signal, it will stop, and it's just as short as using 'time.Sleep(duration)'.

>> You may consider:

 

Wrap it Up

We've gone through 6 patterns, each with its guidelines and some fine points. But the key takeaway is knowing when and how to use each pattern.

This understanding helps us, developers build Go applications that not only perform well under pressure but are also easier to handle and keep up

Let's quickly review:

  • Generator Pattern: This pattern involves using functions that send values to a channel.
  • Worker Pool Pattern: This sets up a group of workers that pick up tasks and do them concurrently
  • Pipeline Pattern: We break the work into steps and move data from one step to the next.
  • Fan-In Pattern: This pattern takes input from many channels and brings it all into a single channel.
  • Semaphore Pattern: This controls access to resources by limiting how many can use them at once.
  • Timeout Pattern: Making sure things don't take too long and knowing when to stop.

Hope my sharings can help you partly in your Golang coding journey!

>>> Follow and Contact Relia Software for more information!

  • golang
  • coding
  • development
  • Designing an application
  • Mobile App Development