Easy Concurrency: Active Object Pattern in Go Explained

Photo by Burak The Weekender: https://www.pexels.com/photo/hanging-light-bulb-132340/

Introduction

Sometimes you need to decouple method execution from method invocation. In such cases the Active Object design pattern is a good choice. It allows for more flexible and above all concurrent systems. One the of the hallmarks of this pattern is the fact that each method call is encapsulated, or packaged if you will in an object.

These objects are then placed in a queue. The Active Object itself then processes the queue in a separate thread.

This all sounds rather abstract, so we will implement a very simplified example.

Implementation in Go

In this example, we’ll create a basic log processor. Instead of writing logs directly to the console, we’ll use a queue to process log entries asynchronously. This is especially useful when dealing with time-consuming tasks like writing logs to a database or a remote server.

Let’s start:

package main

import (
	"fmt"
	"sync"
	"time"
)

type LogSeverityType string

In most logging systems, you can usually have several types of severity, this is how we implement that in Go:

const (
	Info    LogSeverityType = "Info"
	Warning LogSeverityType = "Warning"
	Error   LogSeverityType = "Error"
)

Now we come the LogMessage itself:

type LogMessage struct {
	Severity LogSeverityType
	Message  string
}

Now we come to the Active Object itself:

type ActiveLogger struct {
	logQueue  chan *LogMessage
	stopEvent chan struct{}
	wg        sync.WaitGroup
}

func createActiveLogger() *ActiveLogger {
	return &ActiveLogger{
		logQueue:  make(chan *LogMessage, 10),
		stopEvent: make(chan struct{}),
	}
}

Here some explanation is needed:

  1. First we need a queue of message to be processed.
  2. We also need to have a way to stop processing, that is what stopEvent is for.
  3. Also we need wait for each sub-thread to end, hence the waitGroup.

Next we need to able to log:

func (l *ActiveLogger) Log(message *LogMessage) {
	l.logQueue <- message
}

Next we can start processing:

func (l *ActiveLogger) StartLogProcessor() {
	l.wg.Add(1)
	go func() {
		defer l.wg.Done()
		for {
			select {
			case message := <-l.logQueue:
				l.processMessage(message)
			case <-l.stopEvent:
				return
			}
		}
	}()
}

Line by line:

  1. First of all we update the waitgroup
  2. Then we start a new go-routine.
  3. First we make sure the waitgroup is updated when we exit the routine, by use of the defer statement.
  4. Next we enter an infinite loop
    • when we find a message in the message queue, we process that
    • In case we find somehing on stopEvent we return from the go-routine

Next we need to be able to stop the processor:

func (l *ActiveLogger) StopLogProcessor() {
	close(l.stopEvent)
	l.wg.Wait()
}

In this method we close the stopEvent, thereby stopping processing. Then we wait for all the go-routines to finish.

The processMessage is now quite simple to write:

func (l *ActiveLogger) processMessage(m *LogMessage) {
	fmt.Printf("Processing: (%s): %s\n", m.Severity, m.Message)
	time.Sleep(500 * time.Millisecond)
	fmt.Printf("Processed: (%s): %s\n", m.Severity, m.Message)
}

Testing time

Now we can test this setup:

func main() {
	logger := createActiveLogger()

	logger.StartLogProcessor()
	for i := 1; i <= 10; i++ {
		message := &LogMessage{
			Message:  fmt.Sprintf("Message number is %d", i),
			Severity: Info,
		}
		logger.Log(message)
	}
	message := &LogMessage{
		Message:  "There has been an error",
		Severity: Error,
	}
	logger.Log(message)

	time.Sleep(2 * time.Second)
	logger.StopLogProcessor()
	fmt.Println("Stopped log processor")
}

Line by line:

  1. We create our active object and start it. Remember that StartLogProcessor starts an infinite loop in a separate thread.
  2. Next we create 10 new messages, and send them to the queue.
  3. We also add an extra error message.
  4. After two seconds we stop the processor, aqnd print a message.

Run this and you should see the messages appear in the console. Now increase the timeout in the processMessage() method from 50 to 500 for example. This has to do with the fact that:

  1. This pattern is by definition asynchronous
  2. You shut down the Active Object before all items have been processed.

Conclusion

Go’s strong support for multi-threading makes implementing the Active Object pattern straightforward. This pattern is useful for various scenarios, such as message processing, printer spooling, or chat applications, where decoupling method execution from invocation and handling tasks concurrently is beneficial.

Leave a Reply

Your email address will not be published. Required fields are marked *