Tunny est une bibliothèque en Golang permettant de créer et de gérer des pools de goroutines, vous permettant de limiter le travail à partir de n'importe quel nombre de goroutines à l'aide d'API synchrones.

Lorsque votre travail provient d'un nombre arbitraire de sources asynchrones mais que votre capacité de traitement parallèle est limitée, un pool de goroutines fixe est extrêmement utile. Par exemple, lors du traitement de travaux de requêtes HTTP intensifs en CPU, vous pouvez créer un pool de la taille du nombre de CPU.

Installation

go get github.com/Jeffail/tunny

Alternativement, en utilisant dep :

dep ensure -add github.com/Jeffail/tunny

Utilisation

Dans la plupart des cas, votre travail intensif peut être représenté par une simple func(), auquel cas vous pouvez utiliser NewFunc. Voyons comment utiliser notre exemple de requêtes HTTP pour comptage CPU :

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Effectuer des opérations intensives en CPU en utilisant le payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Erreur interne", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Importez ce travail dans notre pool. Cet appel est synchrone et bloquera jusqu'à ce que le travail soit terminé.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny prend également en charge les délais d'attente. Vous pouvez remplacer l'appel Process ci-dessus par le code suivant :

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Délai de la requête dépassé", http.StatusRequestTimeout)
}

Vous pouvez également utiliser le contexte de la requête (ou tout autre contexte) pour gérer les délais et les échéances. Remplacez simplement l'appel Process par le code suivant :

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Délai de la requête dépassé", http.StatusRequestTimeout)
}

Modification de la taille du pool

Vous pouvez utiliser SetSize(int) pour changer la taille du pool Tunny à tout moment.

pool.SetSize(10) // 10 goroutines
pool.SetSize(100) // 100 goroutines

Cela est sécurisé même si d'autres goroutines sont toujours en cours de traitement.

Goroutine avec état

Parfois, chaque goroutine dans le pool Tunny a besoin de son propre état de gestion. Dans ce cas, vous devriez implémenter tunny.Worker, qui comprend des appels pour la terminaison, l'interruption (si un travail prend trop de temps et n'est plus nécessaire) et le blocage de l'allocation du prochain travail jusqu'à ce qu'une certaine condition soit remplie.

Lors de la création d'un pool avec le type Worker, vous devez fournir un constructeur pour générer votre implémentation personnalisée :

pool := tunny.New(poolSize, func() Worker {
	// TODO: Effectuer l'allocation d'état pour chaque goroutine ici.
	return newCustomWorker()
})

De cette manière, Tunny peut nettoyer la création et la destruction du type Worker lorsque la taille du pool change.

Ordonnancement

Les travaux en attente ne sont pas garantis d'être traités dans l'ordre. En raison de l'implémentation actuelle des canaux et des blocs select, les piles de travaux en attente seront traitées comme une file d'attente FIFO. Cependant, ce comportement ne fait pas partie de la spécification et ne doit pas être considéré comme fiable.