Tunny is a Golang library for creating and managing goroutine pools, allowing you to limit the work from any number of goroutines using synchronous APIs.

When your work comes from an arbitrary number of asynchronous sources but your parallel processing capability is limited, a fixed goroutine pool is extremely useful. For example, when processing CPU-intensive HTTP request jobs, you can create a pool the size of the number of CPUs.

Installation

go get github.com/Jeffail/tunny

Alternatively, using dep:

dep ensure -add github.com/Jeffail/tunny

Usage

For most cases, your heavy work can be represented by a simple func(), in which case you can use NewFunc. Let's see how to use our example of HTTP requests to CPU counting:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Perform some CPU-intensive operations using payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Import this work into our pool. This call is synchronous and will block until the job is complete.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny also supports timeouts. You can replace the above Process call with the following code:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

You can also use the request context (or any other context) to handle timeouts and deadlines. Simply replace the Process call with the following code:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

Modifying Pool Size

You can use SetSize(int) to change the size of the Tunny pool at any time.

pool.SetSize(10) // 10 goroutines
pool.SetSize(100) // 100 goroutines

This is safe even if other goroutines are still processing.

Stateful Goroutine

Sometimes, each goroutine in the Tunny pool needs its own management state. In that case, you should implement tunny.Worker, which includes calls for termination, interrupt (if a job times out and is no longer needed), and blocking the allocation of the next job until a certain condition is met.

When creating a pool with the Worker type, you need to provide a constructor to generate your custom implementation:

pool := tunny.New(poolSize, func() Worker {
	// TODO: Perform state allocation for each goroutine here.
	return newCustomWorker()
})

This way, Tunny can clean up the creation and destruction of the Worker type when the pool size changes.

Ordering

Backlogged jobs are not guaranteed to be processed in order. Due to the current implementation of channels and select blocks, backlogged job stacks will be processed as a FIFO queue. However, this behavior is not part of the specification and should not be relied upon.