Tunny ist eine Golang-Bibliothek zum Erstellen und Verwalten von Goroutine-Pools, die es ermöglicht, die Arbeit aus einer beliebigen Anzahl von Goroutines mithilfe synchroner APIs zu begrenzen.
Wenn Ihre Arbeit aus einer beliebigen Anzahl von asynchronen Quellen stammt, Ihre gleichzeitige Verarbeitungsfähigkeit jedoch begrenzt ist, ist ein fester Goroutine-Pool äußerst nützlich. Zum Beispiel können Sie bei der Verarbeitung von CPU-intensiven HTTP-Anforderungsjobs einen Pool in der Größe der Anzahl der CPUs erstellen.
Installation
go get github.com/Jeffail/tunny
Alternativ können Sie dep verwenden:
dep ensure -add github.com/Jeffail/tunny
Verwendung
In den meisten Fällen kann Ihre aufwendige Arbeit durch eine einfache func()
-Funktion dargestellt werden, in welchem Fall Sie NewFunc
verwenden können. Sehen wir uns an, wie Sie unser Beispiel von HTTP-Anfragen zur CPU-Zählung verwenden können:
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Führen Sie einige CPU-intensive Operationen unter Verwendung von payload aus
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Interner Fehler", http.StatusInternalServerError)
}
defer r.Body.Close()
// Führen Sie diese Arbeit in unseren Pool ein. Dieser Aufruf ist synchron und blockiert, bis der Job abgeschlossen ist.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
Tunny unterstützt auch Timeouts. Sie können den obigen Process
-Aufruf durch den folgenden Code ersetzen:
result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
http.Error(w, "Anforderung abgelaufen", http.StatusRequestTimeout)
}
Sie können auch den Anfragemodus (oder einen anderen Modus) verwenden, um Timeouts und Fristen zu behandeln. Ersetzen Sie einfach den Process
-Aufruf durch den folgenden Code:
result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
http.Error(w, "Anforderung abgelaufen", http.StatusRequestTimeout)
}
Änderung der Poolgröße
Sie können SetSize(int)
verwenden, um die Größe des Tunny-Pools jederzeit zu ändern.
pool.SetSize(10) // 10 Goroutines
pool.SetSize(100) // 100 Goroutines
Das ist auch dann sicher, wenn andere Goroutines noch verarbeiten.
Zustandsbehaftete Goroutine
Manchmal benötigt jede Goroutine im Tunny-Pool ihren eigenen Verwaltungszustand. In diesem Fall sollten Sie tunny.Worker
implementieren, der Aufrufe für Beendigung, Unterbrechung (wenn ein Job Timeout hat und nicht mehr benötigt wird) und Blockierung der Zuweisung des nächsten Jobs bis zu einem bestimmten Zustand umfasst.
Bei der Erstellung eines Pools mit dem Typ Worker
müssen Sie einen Konstruktor bereitstellen, um Ihre benutzerdefinierte Implementierung zu generieren:
pool := tunny.New(poolSize, func() Worker {
// TODO: Führen Sie hier die Zustandszuweisung für jede Goroutine durch.
return newCustomWorker()
})
So kann Tunny die Erstellung und Zerstörung des Worker
-Typs bereinigen, wenn sich die Poolgröße ändert.
Sortierung
Es wird nicht garantiert, dass zurückgestellte Jobs in der Reihenfolge verarbeitet werden. Aufgrund der aktuellen Implementierung von Kanälen und Select-Blöcken werden zurückgestellte Jobstapel als FIFO-Warteschlange verarbeitet. Dieses Verhalten ist jedoch nicht Teil der Spezifikation und sollte nicht darauf vertraut werden.