Introduction aux fourmis Go

ants est un pool de goroutines à haute performance qui met en œuvre la planification et la gestion d'un grand nombre de goroutines, permettant la limitation et la réutilisation des ressources, ce qui permet d'obtenir une exécution plus efficace des tâches lors du développement de programmes concurrents.

Caractéristiques

  • Planifie automatiquement un grand nombre de goroutines et les réutilise.
  • Nettoie régulièrement les goroutines expirées pour économiser davantage de ressources.
  • Fournit un grand nombre d'interfaces utiles : soumission de tâches, obtention du nombre de goroutines en cours d'exécution, ajustement dynamique de la taille du pool, libération du pool et redémarrage du pool.
  • Gère les paniques de manière élégante pour éviter les plantages du programme.
  • La réutilisation des ressources permet d'économiser considérablement la mémoire. Dans le scénario de tâches concurrentes en lots à grande échelle, il offre des performances supérieures à la concurrence native des goroutines.
  • Mécanisme non bloquant

Comment fonctionne ants

Organigramme

ants-flowchart-cn

Images animées

Installation

Utilisation de la version ants v1 :

go get -u github.com/panjf2000/ants

Utilisation de la version ants v2 (activer GO111MODULE=on) :

go get -u github.com/panjf2000/ants/v2

Utilisation

Lors de l'écriture d'un programme concurrent Go qui lance un grand nombre de goroutines, cela consommera inévitablement une grande quantité de ressources système (mémoire, CPU). En utilisant ants, vous pouvez instancier un pool de goroutines pour réutiliser les goroutines, économiser des ressources et améliorer les performances :

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("exécuter avec %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Bonjour le monde !")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Utiliser le pool commun.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("goroutines en cours d'exécution : %d\n", ants.Running())
	fmt.Printf("terminez toutes les tâches.\n")

	// Utiliser le pool avec une fonction,
	// définir 10 comme capacité du pool de goroutines et 1 seconde comme durée d'expiration.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Soumettre les tâches une par une.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("goroutines en cours d'exécution : %d\n", p.Running())
	fmt.Printf("terminez toutes les tâches, le résultat est %d\n", sum)
}

Configuration du pool

// Option représente la fonction facultative.
type Option func(opts *Options)

// Options contient toutes les options qui seront appliquées lors de l'instanciation d'un pool ants.
type Options struct {
	// ExpiryDuration est une période pour la goroutine de nettoyage pour nettoyer les travailleurs expirés,
	// la goroutine de nettoyage scanne tous les travailleurs toutes les `ExpiryDuration` et nettoie ceux qui n'ont pas été
	// utilisés depuis plus de `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indique s'il faut effectuer une préallocation de mémoire lors de l'initialisation du pool.
	PreAlloc bool

	// Nombre maximal de goroutine bloquées sur pool.Submit.
	// 0 (valeur par défaut) signifie qu'il n'y a pas de limite.
	MaxBlockingTasks int

	// Lorsque Nonblocking est vrai, Pool.Submit ne sera jamais bloqué.
	// ErrPoolOverload sera retourné lorsque Pool.Submit ne peut pas être effectué immédiatement.
	// Lorsque Nonblocking est vrai, MaxBlockingTasks est inopérant.
	Nonblocking bool

	// PanicHandler est utilisé pour gérer les paniques de chaque goroutine travailleur.
	// si nil, les paniques seront à nouveau propagées à partir des goroutines travailleur.
	PanicHandler func(interface{})

	// Logger est le journal personnalisé pour enregistrer des informations, s'il n'est pas défini,
	// le journal standard par défaut du package log est utilisé.
	Logger Logger
}

// WithOptions accepte la configuration complète des options.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration configure le temps d'intervalle pour nettoyer les goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc indique s'il doit allouer dynamiquement de la mémoire pour les travailleurs.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks configure le nombre maximum de goroutines bloquées lorsqu'il atteint la capacité du pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking indique que le pool renverra nil lorsqu'il n'y a pas de travailleurs disponibles.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler configure le gestionnaire de panique.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger configure un journal personnalisé.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

En utilisant diverses fonctions facultatives lors de l'appel de NewPool/NewPoolWithFunc, les valeurs de chaque élément de configuration dans ants.Options peuvent être définies, puis utilisées pour personnaliser le pool de goroutines.

Pool personnalisé

ants prend en charge l'instanciation du propre Pool de l'utilisateur avec une capacité de pool spécifique; en appelant la méthode NewPool, un nouveau Pool avec la capacité spécifiée peut être instancié comme suit:

p, _ := ants.NewPool(10000)

Soumission de tâches

Les tâches sont soumises en appelant la méthode ants.Submit(func()):

ants.Submit(func(){})

Ajustement dynamique de la capacité du pool de goroutines

Pour ajuster dynamiquement la capacité du pool de goroutines, vous pouvez utiliser la méthode Tune(int):

pool.Tune(1000) // Ajuster sa capacité à 1000
pool.Tune(100000) // Ajuster sa capacité à 100000

Cette méthode est sûre pour les threads.

Pré-allouer la mémoire de la file d'attente des goroutines

ants vous permet de pré-allouer de la mémoire pour l'ensemble du pool, ce qui peut améliorer les performances du pool de goroutines dans certains scénarios spécifiques. Par exemple, dans un scénario où un pool avec une très grande capacité est nécessaire et où chaque tâche à l'intérieur de la goroutine prend du temps, la pré-allocation de mémoire pour la file d'attente des goroutines réduira les réallocations de mémoire inutiles.

// ants préallouera la capacité totale du pool lorsque vous appelez cette fonction
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Libérer le pool

pool.Release()

Redémarrer le pool

// En appelant la méthode Reboot(), vous pouvez réactiver un pool qui a été précédemment détruit et le remettre en service.
pool.Reboot()

À propos de l'ordre d'exécution des tâches

ants ne garantit pas l'ordre d'exécution des tâches, et l'ordre d'exécution ne correspond pas nécessairement à l'ordre de soumission. Cela est dû au fait que ants traite toutes les tâches soumises de manière concurrente, et les tâches seront affectées à des travailleurs qui s'exécutent également de manière simultanée, ce qui entraîne l'exécution des tâches dans un ordre concurrent et non déterministe.