Go Pattern: Hybrid Handler

Table of Contents

Overview

In today’s high-performance and concurrent computing environments, effectively processing a stream of messages using a mix of purely computational functions and remote procedure calls has become a significant challenge. The Go programming language is designed to handle concurrency well, but when it comes to managing a hybrid load, even Go can struggle to achieve optimal CPU utilization. In this article, we will discuss the Hybrid Handler pattern, an efficient and unified approach to address this challenge.

The problem

Assuming we have a stream of incoming messages and we need to process them with a list of handlers, aggregate the results and send the output downstream.

If all the handlers are only purely local computations, then an implementation like below already meets the basic functional and performance requirements.

type Processor struct {
    handlers SyncHandler
}

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := NewOutput()
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message)
        if err != nil {
            return err
        }
        output.Add(result)
    }
    return p.SendOutput(output)
}

e.g. With an 8-core CPU, we can simply initialise 8 Processor objects and run them in 8 goroutines, which can make full use of the hardware easily. (error handling is omitted here)

for i := 0; i < numCPU; i++ {
    go func() {
        processor := NewProcessor()
        for message := range inputChan {
            processor.ProcessMessage(ctx, message)
        }
    }()
}

However, if the processing logic involves a mix of local computations and remote procedure calls, it becomes more difficult to process a high volume of messages in a timely and efficient manner. If the processor still handles messages one by one, it must wait for all handlers to complete before outputting the corresponding result. However, The added I/O roundtrip delay for each message will be so high that the CPU cores stay idle most of the time.

To mitigate this issue, one could attempt to saturate the CPU with brute force, parallelizing the processors with even more goroutines, as in most microservices written in Go, where one goroutine is used to handle one incoming request. While this might work well enough for an IO-bound application or when the traffic is low, it can penalize the performance significantly when all or most of the handlers are pure computations due to the relative costs of channel message passing and goroutine scheduling compared to the computations themselves.

Another feasible but more ad-hoc solution is just to separate purely computational handlers from remote handlers into two groups, and execute them differently, which could potentially remove the penalty on pure computations. However, how to wait for all the results is still a problem and two separate groups add more complexity to both the start and finish of those functions.

What we need here is an abstraction that unifies local and remote handlers without the overhead of goroutines and channels. This would allow purely computational handlers to achieve bare metal speed while remote handlers don’t block unnecessarily.

The Hybrid Handler pattern

type Handler interface {
    Handle(
        ctx          context.Context,
        message      Message,
        returnResult ReturnResultFunc,
    ) error

type ReturnResultFunc func(ctx context.Context, result Result) error

The key to addressing these challenges lies in an interface method that provides a callback parameter returnResult for returning the result, which allows the implementation of the interface to be either a synchronous pure computation or an asynchronous IO operation without introducing much overhead.

Is it just a function with a callback?

Yes, at first glance, it might look not only trivial but also unidiomatic in Go, but looking closely, we will find that it is a simple and natural solution to the problem at hand and makes a lot of sense.

The new processor

With the Handler interface above, the Processor can be re-implemented as below.

On arrival of each result, the output can decrement a counter atomically to determine if it’s ready to be sent downstream. This operation adds much less overhead than the brute force way of adding many goroutines.

type Processor struct {
    handlers Handler
}

type Output struct {
    handlerCount int64
    processor *Processor
}

func (p *Processor) NewOutput(handlerCount int) *Output {
    return &Output{
        HandlerCount: handlerCount,
        processor:    p,
    }
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := p.NewOutput(len(p.handlers))
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message, output.Add)
        if err != nil {
            return err
        }
    }
    return nil
}

func (o *Output) Add(ctx context.Context, result Result) error
    // add the result to the output
    // ......

    // check if it's the result of the the last handler
	if atomic.AddInt64(&h.row.pendingCategories, -1) > 0 {
		return nil // not the last one
	}

    // is the last one
    return p.processor.SendOutput(o)
}

Synchronous handlers

It is almost trivial to write an adapter from any synchronous handler to the Handler interface above, and the overhead is just one extra interface method call.

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

type SyncHandlerAdapter struct {
    syncHandler SyncHandler
}

func (a SyncHandlerAdapter) Handle(
    ctx          context.Context,
    message      Message,
    returnResult ReturnResultFunc,
) error {
    result, err := a.syncHandler.Handle(ctx, message)
    if err != nil {
        return err
    }
    return returnResult(ctx, result)
}

Asynchronous handlers

There are various techniques to wrap a remote procedure call in an asynchronous handler efficiently, e.g. a batch or a streaming API. I do not intend to cover the details here, but regardless of the implementations, the Handler interface allows the Handle method to return immediately without blocking the processing of the next message while a background goroutine can send the result back with the returnResult callback function when the result is ready later.

A hybrid handler is also possible, returning the result immediately for some inputs while doing it asynchronously for others.

We might need to pay more attention to the error handling in the asynchronous handlers. The handler must ensure a result is returned so that the output can decrement the count correctly. So one possible way is to embed the error in the result to be passed back.

Conclusion

In conclusion, the Hybrid Handler pattern allows synchronous and asynchronous handlers to coexist without incurring significant overhead, resulting in improved CPU utilization and reduced complexity. By implementing the Hybrid Handler pattern, developers can optimize performance in high-performance computing systems while maintaining the flexibility to handle various types of workloads.

Share :

Related Posts

Value vs Pointer Receivers

Value vs Pointer Receivers

Should I use value receivers or pointer receivers? Value receivers have some benefits include immutability, concurrent safety and clean logic (not always, often true). But to what extend can I use value receivers without an issue or performance penalty?

Read More
Go Pattern: Runner

Go Pattern: Runner

Again and again, a concurrent pattern emerges from the need to control goroutine lifecycles and handle their errors, and I call it the “Runner Pattern”.

Read More
Go Pattern: Buffered Writer

Go Pattern: Buffered Writer

A buffered writer is so ubiquitous that we do not usually consider it as a pattern, but sometimes we reinvent it or even do it in an inferior way. Let us look at a real use case first.

Read More