Tunny est une bibliothèque en Golang permettant de créer et de gérer des pools de goroutines, vous permettant de limiter le travail à partir de n'importe quel nombre de goroutines à l'aide d'API synchrones.
Lorsque votre travail provient d'un nombre arbitraire de sources asynchrones mais que votre capacité de traitement parallèle est limitée, un pool de goroutines fixe est extrêmement utile. Par exemple, lors du traitement de travaux de requêtes HTTP intensifs en CPU, vous pouvez créer un pool de la taille du nombre de CPU.
Installation
go get github.com/Jeffail/tunny
Alternativement, en utilisant dep :
dep ensure -add github.com/Jeffail/tunny
Utilisation
Dans la plupart des cas, votre travail intensif peut être représenté par une simple func()
, auquel cas vous pouvez utiliser NewFunc
. Voyons comment utiliser notre exemple de requêtes HTTP pour comptage CPU :
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Effectuer des opérations intensives en CPU en utilisant le payload
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Erreur interne", http.StatusInternalServerError)
}
defer r.Body.Close()
// Importez ce travail dans notre pool. Cet appel est synchrone et bloquera jusqu'à ce que le travail soit terminé.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
Tunny prend également en charge les délais d'attente. Vous pouvez remplacer l'appel Process
ci-dessus par le code suivant :
result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
http.Error(w, "Délai de la requête dépassé", http.StatusRequestTimeout)
}
Vous pouvez également utiliser le contexte de la requête (ou tout autre contexte) pour gérer les délais et les échéances. Remplacez simplement l'appel Process
par le code suivant :
result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
http.Error(w, "Délai de la requête dépassé", http.StatusRequestTimeout)
}
Modification de la taille du pool
Vous pouvez utiliser SetSize(int)
pour changer la taille du pool Tunny à tout moment.
pool.SetSize(10) // 10 goroutines
pool.SetSize(100) // 100 goroutines
Cela est sécurisé même si d'autres goroutines sont toujours en cours de traitement.
Goroutine avec état
Parfois, chaque goroutine dans le pool Tunny a besoin de son propre état de gestion. Dans ce cas, vous devriez implémenter tunny.Worker
, qui comprend des appels pour la terminaison, l'interruption (si un travail prend trop de temps et n'est plus nécessaire) et le blocage de l'allocation du prochain travail jusqu'à ce qu'une certaine condition soit remplie.
Lors de la création d'un pool avec le type Worker
, vous devez fournir un constructeur pour générer votre implémentation personnalisée :
pool := tunny.New(poolSize, func() Worker {
// TODO: Effectuer l'allocation d'état pour chaque goroutine ici.
return newCustomWorker()
})
De cette manière, Tunny peut nettoyer la création et la destruction du type Worker
lorsque la taille du pool change.
Ordonnancement
Les travaux en attente ne sont pas garantis d'être traités dans l'ordre. En raison de l'implémentation actuelle des canaux et des blocs select, les piles de travaux en attente seront traitées comme une file d'attente FIFO. Cependant, ce comportement ne fait pas partie de la spécification et ne doit pas être considéré comme fiable.