Introduction
Sometimes you need to decouple method execution from method invocation. In such cases the Active Object design pattern is a good choice. It allows for more flexible and above all concurrent systems. One the of the hallmarks of this pattern is the fact that each method call is encapsulated, or packaged if you will in an object.
These objects are then placed in a queue. The Active Object itself then processes the queue in a separate thread.
This all sounds rather abstract, so we will implement a very simplified example.
Implementation in Go
In this example, we’ll create a basic log processor. Instead of writing logs directly to the console, we’ll use a queue to process log entries asynchronously. This is especially useful when dealing with time-consuming tasks like writing logs to a database or a remote server.
Let’s start:
package main
import (
"fmt"
"sync"
"time"
)
type LogSeverityType string
In most logging systems, you can usually have several types of severity, this is how we implement that in Go:
const (
Info LogSeverityType = "Info"
Warning LogSeverityType = "Warning"
Error LogSeverityType = "Error"
)
Now we come the LogMessage
itself:
type LogMessage struct {
Severity LogSeverityType
Message string
}
Now we come to the Active Object itself:
type ActiveLogger struct {
logQueue chan *LogMessage
stopEvent chan struct{}
wg sync.WaitGroup
}
func createActiveLogger() *ActiveLogger {
return &ActiveLogger{
logQueue: make(chan *LogMessage, 10),
stopEvent: make(chan struct{}),
}
}
Here some explanation is needed:
- First we need a queue of message to be processed.
- We also need to have a way to stop processing, that is what
stopEvent
is for. - Also we need wait for each sub-thread to end, hence the waitGroup.
Next we need to able to log:
func (l *ActiveLogger) Log(message *LogMessage) {
l.logQueue <- message
}
Next we can start processing:
func (l *ActiveLogger) StartLogProcessor() {
l.wg.Add(1)
go func() {
defer l.wg.Done()
for {
select {
case message := <-l.logQueue:
l.processMessage(message)
case <-l.stopEvent:
return
}
}
}()
}
Line by line:
- First of all we update the waitgroup
- Then we start a new go-routine.
- First we make sure the waitgroup is updated when we exit the routine, by use of the
defer
statement. - Next we enter an infinite loop
- when we find a message in the message queue, we process that
- In case we find somehing on
stopEvent
we return from the go-routine
Next we need to be able to stop the processor:
func (l *ActiveLogger) StopLogProcessor() {
close(l.stopEvent)
l.wg.Wait()
}
In this method we close the stopEvent, thereby stopping processing. Then we wait for all the go-routines to finish.
The processMessage
is now quite simple to write:
func (l *ActiveLogger) processMessage(m *LogMessage) {
fmt.Printf("Processing: (%s): %s\n", m.Severity, m.Message)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Processed: (%s): %s\n", m.Severity, m.Message)
}
Testing time
Now we can test this setup:
func main() {
logger := createActiveLogger()
logger.StartLogProcessor()
for i := 1; i <= 10; i++ {
message := &LogMessage{
Message: fmt.Sprintf("Message number is %d", i),
Severity: Info,
}
logger.Log(message)
}
message := &LogMessage{
Message: "There has been an error",
Severity: Error,
}
logger.Log(message)
time.Sleep(2 * time.Second)
logger.StopLogProcessor()
fmt.Println("Stopped log processor")
}
Line by line:
- We create our active object and start it. Remember that
StartLogProcessor
starts an infinite loop in a separate thread. - Next we create 10 new messages, and send them to the queue.
- We also add an extra error message.
- After two seconds we stop the processor, aqnd print a message.
Run this and you should see the messages appear in the console. Now increase the timeout in the processMessage()
method from 50 to 500 for example. This has to do with the fact that:
- This pattern is by definition asynchronous
- You shut down the Active Object before all items have been processed.
Conclusion
Go’s strong support for multi-threading makes implementing the Active Object pattern straightforward. This pattern is useful for various scenarios, such as message processing, printer spooling, or chat applications, where decoupling method execution from invocation and handling tasks concurrently is beneficial.