Einführung in Go-Ameisen
ants
ist ein leistungsfähiger Goroutine-Pool, der die Planung und Verwaltung einer großen Anzahl von Goroutines implementiert, um die Begrenzung und Wiederverwendung von Ressourcen zu ermöglichen und somit eine effizientere Ausführung von Aufgaben bei der Entwicklung von nebenläufigen Programmen zu erreichen.
Eigenschaften
- Automatische Planung einer massiven Anzahl von Goroutines und Wiederverwendung derselben.
- Regelmäßige Bereinigung abgelaufener Goroutines zur weiteren Ressourcenschonung.
- Bietet eine Vielzahl nützlicher Schnittstellen: Aufgabeneinreichung, Abrufen der Anzahl aktiver Goroutines, dynamische Anpassung der Poolgröße, Freigabe des Pools und Neustart des Pools.
- Behandelt Paniksituationen gracefully, um Programmabstürze zu verhindern.
- Die Ressourcenwiederverwendung spart erheblich Speicherplatz. In Szenarien mit großen stapelverarbeitenden nebenläufigen Aufgaben weist es eine höhere Leistung als die native Goroutine-Nebenläufigkeit auf.
- Nicht-blockierender Mechanismus
Wie funktioniert ants
Flussdiagramm
Animationsbilder
Installation
Verwendung von ants
Version v1:
go get -u github.com/panjf2000/ants
Verwendung von ants
Version v2 (GO111MODULE=on aktivieren):
go get -u github.com/panjf2000/ants/v2
Verwendung
Bei der Erstellung eines Go-Nebenläufigkeitsprogramms, das eine große Anzahl von Goroutines startet, wird zwangsläufig eine große Menge an Systemressourcen (Speicher, CPU) verbraucht. Durch die Verwendung von ants
können Sie einen Goroutine-Pool instanziieren, um Goroutines wiederzuverwenden, Ressourcen zu sparen und die Leistung zu verbessern:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("ausgeführt mit %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hallo Welt!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Den generellen Pool verwenden.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("ausgeführte Goroutines: %d\n", ants.Running())
fmt.Printf("alle Aufgaben abgeschlossen.\n")
// Den Pool mit einer Funktion verwenden,
// 10 als Kapazität des Goroutine-Pools und 1 Sekunde für die Ablaufdauer setzen.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Aufgaben nacheinander einreichen.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("ausgeführte Goroutines: %d\n", p.Running())
fmt.Printf("alle Aufgaben abgeschlossen, Ergebnis ist %d\n", sum)
}
Pool-Konfiguration
// Option stellt die optionale Funktion dar.
type Option func(opts *Options)
// Options enthält alle Optionen, die angewendet werden, wenn ein Ameisen-Pool instanziiert wird.
type Options struct {
// ExpiryDuration ist ein Zeitraum für den Reinigungsgoroutine, um verfallene Arbeiter aufzuräumen,
// Der Reiniger durchsucht alle Arbeiter alle `ExpiryDuration` und räumt die Arbeiter auf, die seit mehr als `ExpiryDuration` nicht verwendet wurden.
ExpiryDuration time.Duration
// PreAlloc gibt an, ob bei der Initialisierung des Pools eine Speicher-Vorabzuweisung vorgenommen werden soll.
PreAlloc bool
// Maximale Anzahl von Goroutinen, die bei pool.Submit blockieren.
// 0 (Standardwert) bedeutet keine solche Begrenzung.
MaxBlockingTasks int
// Wenn Nonblocking wahr ist, wird Pool.Submit nie blockiert.
// ErrPoolOverload wird zurückgegeben, wenn Pool.Submit nicht sofort ausgeführt werden kann.
// Wenn Nonblocking wahr ist, ist MaxBlockingTasks unwirksam.
Nonblocking bool
// PanicHandler wird verwendet, um Paniken von jeder Arbeiter-Goroutine zu behandeln.
// Wenn nil, werden Paniken wieder von den Arbeiter-Goroutinen ausgeworfen.
PanicHandler func(interface{})
// Logger ist der benutzerdefinierte Logger zum Protokollieren von Informationen, wenn er nicht festgelegt ist,
// wird der Standard-Logger aus dem Protokollpaket verwendet.
Logger Logger
}
// WithOptions akzeptiert die gesamte Optionskonfiguration.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration legt die Intervallzeit für die Aufräumung von Goroutinen fest.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc gibt an, ob für Arbeiter Speicher bereitgestellt werden soll.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks legt die maximale Anzahl von blockierten Goroutinen fest, wenn die Kapazität des Pools erreicht ist.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking gibt an, dass der Pool nil zurückgibt, wenn keine verfügbaren Arbeiter vorhanden sind.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler legt den Panic-Handler fest.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// WithLogger legt einen benutzerdefinierten Logger fest.
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
Durch die Verwendung verschiedener optionaler Funktionen beim Aufruf von NewPool
/NewPoolWithFunc
können die Werte der einzelnen Konfigurationselemente in ants.Options
festgelegt und dann zur Anpassung des Goroutine-Pools verwendet werden.
Benutzerdefinierter Pool
ants
unterstützt die Instanziierung des eigenen Pools des Benutzers mit einer spezifischen Poolkapazität; durch Aufruf der Methode NewPool
kann ein neuer Pool mit der angegebenen Kapazität instanziiert werden, wie folgt:
p, _ := ants.NewPool(10000)
Aufgabenübermittlung
Aufgaben werden durch Aufruf der Methode ants.Submit(func())
übermittelt:
ants.Submit(func(){})
Dynamische Anpassung der Goroutine-Poolkapazität
Um die Goroutine-Poolkapazität dynamisch anzupassen, können Sie die Methode Tune(int)
verwenden:
pool.Tune(1000) // Passt die Kapazität auf 1000 an
pool.Tune(100000) // Passt die Kapazität auf 100000 an
Diese Methode ist threadsicher.
Voraballokation von Goroutine-Queuespeicher
Die ants
ermöglicht es Ihnen, Speicher für den gesamten Pool vorab zuzuweisen, was die Leistung des Goroutine-Pools in bestimmten Szenarien verbessern kann. In einem Szenario, in dem ein Pool mit einer sehr großen Kapazität benötigt wird und jede Aufgabe innerhalb der Goroutine zeitaufwendig ist, reduziert die Voraballokation von Speicher für die Goroutine-Queue unnötige Speicherneuzuweisungen.
// ants wird die gesamte Kapazität des Pools vorab reservieren, wenn Sie diese Funktion aufrufen
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
Freigabe des Pools
pool.Release()
Neustart des Pools
// Durch Aufrufen der Methode Reboot() können Sie einen zuvor zerstörten Pool reaktivieren und wieder verwenden.
pool.Reboot()
Über die Reihenfolge der Aufgabenausführung
ants
garantiert nicht die Reihenfolge der Aufgabenausführung, und die Ausführungsreihenfolge entspricht nicht unbedingt der Reihenfolge der Einreichung. Dies liegt daran, dass ants
alle eingereichten Aufgaben parallel verarbeitet und die Aufgaben an gleichzeitig laufende Worker zugewiesen werden, was zu einer Ausführung der Aufgaben in einer parallelen und nicht deterministischen Reihenfolge führt.