Introduzione alle formiche Go

ants è un pool di goroutine ad alte prestazioni che implementa la pianificazione e la gestione di un gran numero di goroutine, permettendo il limite e il riutilizzo delle risorse, conseguendo così un'esecuzione più efficiente dei compiti nello sviluppo di programmi concorrenti.

Caratteristiche

  • Pianificazione automatica di un gran numero di goroutine e loro riutilizzo.
  • Pulizia regolare delle goroutine scadute per risparmiare ulteriormente risorse.
  • Fornisce un gran numero di interfacce utili: sottomissione di task, ottenimento del numero di goroutine in esecuzione, regolazione dinamica delle dimensioni del pool, rilascio del pool e ripartenza del pool.
  • Gestione operazioni di panico in modo elegante per prevenire crash del programma.
  • Il riutilizzo delle risorse risparmia notevolmente l'uso della memoria. Nello scenario di compiti di concorrenza batch su larga scala, ha prestazioni superiori rispetto alla concorrenza nativa di goroutine.
  • Meccanismo non bloccante.

Come funziona ants

Diagramma di flusso

ants-flowchart-cn

Immagini animate

Installazione

Utilizzo della versione ants v1:

go get -u github.com/panjf2000/ants

Utilizzo della versione ants v2 (abilita GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

Utilizzo

Quando si scrive un programma concorrente Go che avvia un gran numero di goroutine, inevitabilmente si consumerà una grande quantità di risorse di sistema (memoria, CPU). Utilizzando ants, è possibile istanziare un pool di goroutine per riutilizzare le goroutine, risparmiando risorse e migliorando le prestazioni:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("esegui con %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Ciao Mondo!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Utilizza il pool comune.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("goroutine in esecuzione: %d\n", ants.Running())
	fmt.Printf("completati tutti i compiti.\n")

	// Utilizza il pool con una funzione,
	// imposta 10 come capacità del pool di goroutine e 1 secondo come durata scaduta.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Sottomette i compiti uno per uno.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("goroutine in esecuzione: %d\n", p.Running())
	fmt.Printf("completati tutti i compiti, il risultato è %d\n", sum)
}

Configurazione del pool

// Option rappresenta la funzione opzionale.
type Option func(opts *Options)

// Options contiene tutte le opzioni che verranno applicate durante l'istanziazione di un pool ants.
type Options struct {
	// ExpiryDuration è il periodo per il processo di pulizia dei worker scaduti,
	// il processo esegue la scansione di tutti i worker ogni `ExpiryDuration` e pulisce i worker che non sono stati
	// utilizzati per più di `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indica se effettuare un pre-allocazione di memoria durante l'inizializzazione del Pool.
	PreAlloc bool

	// Numero massimo di goroutine bloccate su pool.Submit.
	// 0 (valore predefinito) significa nessun limite.
	MaxBlockingTasks int

	// Quando Nonblocking è true, Pool.Submit non verrà mai bloccato.
	// ErrPoolOverload sarà restituito quando Pool.Submit non può essere eseguito immediatamente.
	// Quando Nonblocking è true, MaxBlockingTasks è inoperativo.
	Nonblocking bool

	// PanicHandler è utilizzato per gestire le eccezioni da ogni goroutine worker.
	// se è nullo, le eccezioni verranno nuovamente generate dalle goroutine worker.
	PanicHandler func(interface{})

	// Logger è il logger personalizzato per registrare le informazioni, se non è impostato,
	// viene utilizzato il logger standard predefinito del pacchetto di log.
	Logger Logger
}

// WithOptions accetta l'intera configurazione delle opzioni.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration imposta l'intervallo di tempo per la pulizia delle goroutine.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc indica se dovrebbe effettuare un'allocazione di memoria per i worker.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks imposta il numero massimo di goroutine bloccate quando si raggiunge la capacità del pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking indica che il pool restituirà nil quando non ci sono worker disponibili.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler imposta il gestore di eccezioni.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger imposta un logger personalizzato.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

Utilizzando diverse funzioni opzionali durante la chiamata a NewPool/NewPoolWithFunc, è possibile impostare i valori di ciascuna voce di configurazione in ants.Options e poi usarli per personalizzare il pool di goroutine.

Pool personalizzato

ants supporta l'istanziazione del proprio Pool dell'utente con una capacità specifica; chiamando il metodo NewPool, è possibile istanziare un nuovo pool con la capacità specificata come segue:

p, _ := ants.NewPool(10000)

Invio delle attività

Le attività vengono inviate chiamando il metodo ants.Submit(func()):

ants.Submit(func() {})

Regolazione dinamica della capacità del pool di goroutine

Per regolare dinamicamente la capacità del pool di goroutine, è possibile utilizzare il metodo Tune(int):

pool.Tune(1000) // Regola la sua capacità a 1000
pool.Tune(100000) // Regola la sua capacità a 100000

Questo metodo è thread-safe.

Pre-allocazione della memoria della coda di goroutine

ants ti consente di pre-allocare la memoria per l'intero pool, il che può migliorare le prestazioni del pool di goroutine in determinati scenari specifici. Ad esempio, in uno scenario in cui è necessario un pool con una capacità molto grande e ogni attività all'interno della goroutine è dispendiosa in termini di tempo, preallocare la memoria per la coda di goroutine ridurrà la riallocazione non necessaria della memoria.

// ants pre-asserisce l'intera capacità del pool quando si invoca questa funzione
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Rilascio del Pool

pool.Release()

Riavvio del Pool

// Chiamando il metodo Reboot(), è possibile riattivare un pool che è stato precedentemente distrutto e rimetterlo in uso.
pool.Reboot()

Riguardo all'ordine di esecuzione dei compiti

ants non garantisce l'ordine di esecuzione dei compiti e l'ordine di esecuzione non corrisponde necessariamente all'ordine di invio. Questo perché ants elabora contemporaneamente tutti i compiti inviati e essi verranno assegnati ai worker che stanno eseguendo contemporaneamente, risultando nell'esecuzione dei compiti in un ordine concorrente e non deterministico.