Simple Concurrent Join Pattern: Streamlined Coordination in Go

Photo by Lukas Hartmann: https://www.pexels.com/photo/photo-of-person-pressing-the-button-of-pedestrian-box-1827232/

Introduction

In some applications it is handy not to say necessary for the main thread (or a thread) to wait for the completion of several worker threads.

Examples of these kinds of problems are:

  1. Web scraping: when fetching data from several sites, or from different parts of the same site, it is often efficient to let those different parts be handled by separate threads. We will see a very basic implementation of this later on.
  2. Filke downloads: Again, you could start a separate thread for each download, and wait for each thread to complete
  3. Database operations: You can have threads handling different queries or transactions from differentr users, and in the main thread wait for the completion for each of them

There are probably many other use cases, the common thing between these use cases is that part of the work is delegated to separate threads, and we have one thread waiting for the completion for each thread.

One constraint about the worker threads is that they represent independent tasks, so there is no internal dependency on them.

Implementation in Go

In our example we will build a simple web scraper, querying a URL and printing the status code.

We will start by importing our packages:

import (
	"fmt"
	"net/http"
	"sync"
)

Next we will define our Worker function. This function takes in boolean channel to signal the completion of its task, and a url to scrape:

type Worker func(done chan bool, url string)

Next we will define the Join() function:

func Join(workers ...Worker) {
	var wg sync.WaitGroup
	done := make(chan bool)
	links := []string{
		"https://www.google.com",
		"https://www.hackingwithgo.nl",
	}
	fmt.Printf("Starting %d workers\n", len(workers))
	for index, worker := range workers {
		wg.Add(1)
		go func(index int, worker Worker) {
			fmt.Printf("Starting worker %d\n", index)
			worker(done, links[index])
			wg.Done()
		}(index, worker)
	}

	go func() {
		wg.Wait()
		close(done)
	}()

	for _ = range done {

	}
}

There are several things happening here:

  1. The Join() function gets a so-called variadic parameter, which means it can be called by any number of arguments.
  2. It also initializes a sync.WaitGroup to help keep track of the go-routines
  3. And it also initializes a done channel, so that goroutines can signal their completion
  4. After that, we iterate over the supplied worker functions, starting each worker in their own goroutine. Each worker gets passed the done channel, and of course a URL from the links slice.
  5. A separate goroutine is started to wait for all workers to finish. If that happened the channel can be closed.
  6. Lastly the Join() function enters a loop waiting for signals on the done channel. This means effectively that the Join() function is blocked until all the workers have finished.

The scrape() function basically fetches a URL and prints out the returned status code. After that it signals completion to the done channel:

func scrape(done chan bool, url string) {
	fmt.Printf("Scraping %s\n", url)
	response, err := http.Get(url)
	if err != nil {
		fmt.Println("Error:", err)
	}
	fmt.Println(response.StatusCode)
	done <- true
}

Time to test

Testing this pattern is pretty simple: call the Join function with two scrape function parameters:

func main() {
	Join(scrape, scrape)
	fmt.Println("Done!")
}

Conclusion

Building this pattern in Go was relatively easy, once I understood how channels and waitgroups worked. To be fair, while experimenting with the code, I experienced some code-panics and some deadlocks.

One of the things that is missing from this code is real error handling. As you can see, the scrape function in case of an error, just prints out the error and return. This is not what you want for production code. However, in a next post I will return to this and make this pattern more sophisticated and production ready.

Leave a Reply

Your email address will not be published. Required fields are marked *