Easy Concurrency: Harnessing the Reactor Pattern in Go Programming

Photo by Kindel Media: https://www.pexels.com/photo/close-up-photo-of-opened-switchboard-8488029/

Introduction

There are many ways to handle incoming events. If you need to be able to handle many potential service requests concurrently, the Reactor pattern could be your pattern of choice.

The key ingredient of this pattern is a so-called event-loop, which delegates request to the designated requestr handler.

Through the use of this mechanism, rather than some sort of blocking I/O, this pattern can handle many I/O requests simultaneously with optimal performance. Also modifying or expanding the available request handlers is quite easy.

This pattern strikes a balance between simplicity and scalablitly, and therefore it has become a central element in some server and networking software applications.

Implementation in Go

In this example we will implement a sample API using this pattern. In practice you would do the same thing using a framework like Gin or Fiber, or the enhancements in the http library in Go 1.22.

Let’s start by importing the needed packages:

import (
	"fmt"
	"net"
	"os"
	"sync"
)

The Handler

Next we define the Handler interface, which is implemented by all the handlers:

type Handler interface {
	ServeRequest(conn net.Conn)
}

In our example we will just have one struct, called the DefaultHandler which just prints out the received data:

type DefaultHandler struct{}

func (h DefaultHandler) ServeRequest(conn net.Conn) {
	defer conn.Close()
	buf := make([]byte, 1024)
	_, err := conn.Read(buf)
	if err != nil {
		fmt.Println("Error reading:", err.Error())
		return
	}

	fmt.Println("Received data:", string(buf))
}

Two things to note:

  1. The connection is always closed, using the defer statement
  2. Errors are handled. In a real-world example you would probably return a 500 status code.

The Reactor

Next we come to the core of our pattern, the Reactor:

type Reactor struct {
	handlers map[string]Handler
	mu       sync.Mutex
}

This struct has two fields:

  1. A dictionary or map of handlers, which the Reactor will call upon receiving a request
  2. A sync.Mutex which is used to make sure that only one thread at a time can access the handlers dictionary.

Next we implement the constructor:


func CreateReactor() *Reactor {
	return &Reactor{handlers: make(map[string]Handler)}
}

This code should be self-explanatory.

If we want to have handlers, we need to register the handlers:

func (r *Reactor) RegisterHandler(name string, handler Handler) {
	r.mu.Lock()
	defer r.mu.Unlock()
	r.handlers[name] = handler
}

Two things two note:

  1. We lock the mutex before doing anything else. This means that the struct is ‘locked’ for any other thread.
  2. To make sure that we do not end up in some sort of deadlock, we use the defer statement to make sure the mutex is unlocked.

In our introduction we talked about the event loop, this is implemented in the Run() method:

func (r *Reactor) Run() error {
	listener, err := net.Listen("tcp", ":8080")
	if err != nil {
		return err
	}

	defer listener.Close()
	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("Error accepting connection:", err.Error())
			continue
		}
		handler, ok := r.handlers["default"]
		if !ok {
			fmt.Println("No handler registered")
			conn.Close()
			continue
		}
		go handler.ServeRequest(conn)
	}
}

Line by line:

  1. First we set up a listener on port 8080
  2. if there is an error, we handle that.
  3. Again we use the defer statement to make sure the listener is closed when the function exits
  4. Next we enter the event, which we implement as an infinite for loop
  5. We wait for a connection, using the Accept() method on the listener.
  6. Again we handle the error, if there is one, and go into the next iteration.
  7. Next we check if we have handler, we close the connection, print an error message, and go into the next iteration
  8. When everything is ok, we start a go-routine with the handler.

By using the go-routine we make sure the loop never blocks.

Time to test

Now we can finally test our simple setup:

func main() {
	reactor := CreateReactor()
	reactor.RegisterHandler("default", DefaultHandler{})
	err := reactor.Run()
	if err != nil {
		fmt.Println("Error:", err.Error())
		os.Exit(1)
	}
}

The explanation:

  1. We create a new Reactor struct, and register a new request handler.
  2. Next we start the event-loop, using the Run() method.
  3. When the Run() method ends because of an error, we print the error, and exit the app.

Conclusion

As you can see, this pattern is quite logical, and are common-sense in my opinion. However, Go makes multithreading quite easy.

Possible enhancements:

  1. Looping over all available handlers, or looping over a selection of them for some provided criteria.
  2. Really returning a 500 status code or something similar.
  3. Adding logging.

If there are more request handlers, there could be contestation for resources. In Go there are mechanisms for mitigating those problems, but that is the subject for another post.

Leave a Reply

Your email address will not be published. Required fields are marked *