Introducción a Go ants

ants es un pool de goroutines de alto rendimiento que implementa la programación y gestión de un gran número de goroutines, permitiendo la limitación y reutilización de recursos, logrando así una ejecución más eficiente de tareas al desarrollar programas concurrentes.

Características

  • Programación automática de un gran número de goroutines y reutilización de las mismas.
  • Limpieza regular de goroutines caducadas para ahorrar aún más recursos.
  • Proporciona un gran número de interfaces útiles: envío de tareas, obtención del número de goroutines en ejecución, ajuste dinámico del tamaño del pool, liberación del pool y reinicio del pool.
  • Manejo adecuado de panics para evitar bloqueos del programa.
  • La reutilización de recursos ahorra mucho uso de memoria. En el escenario de tareas concurrentes a gran escala, tiene un rendimiento mayor que la concurrencia nativa de goroutines.
  • Mecanismo de no bloqueo

¿Cómo funciona ants

Diagrama de flujo

ants-flowchart-cn

Imágenes animadas

Instalación

Usando la versión ants v1:

go get -u github.com/panjf2000/ants

Usando la versión ants v2 (habilitar GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

Uso

Al escribir un programa concurrente en Go que lance un gran número de goroutines, inevitablemente consumirá una gran cantidad de recursos del sistema (memoria, CPU). Al usar ants, puedes instanciar un pool de goroutines para reutilizar goroutines, ahorrando recursos y mejorando el rendimiento:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("ejecutando con %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("¡Hola Mundo!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Usar el pool común.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("goroutines en ejecución: %d\n", ants.Running())
	fmt.Printf("terminar todas las tareas.\n")

	// Usar el pool con una función,
	// establece 10 como capacidad del pool de goroutines y 1 segundo como duración caducada.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Enviar tareas una por una.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("goroutines en ejecución: %d\n", p.Running())
	fmt.Printf("terminar todas las tareas, el resultado es %d\n", sum)
}

Configuración de la piscina

// Option representa la función opcional.
type Option func(opts *Options)

// Options contiene todas las opciones que se aplicarán al instanciar una piscina de ants.
type Options struct {
	// ExpiryDuration es el periodo para que la rutina de limpieza elimine aquellos trabajadores expirados,
	// el escáner examina a todos los trabajadores cada `ExpiryDuration` y elimina aquellos trabajadores que no han sido
	// utilizados por más de `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indica si se debe hacer una preasignación de memoria al inicializar la piscina.
	PreAlloc bool

	// Max es el número máximo de gorutinas bloqueadas en pool.Submit.
	// 0 (valor predeterminado) significa que no hay límite.
	MaxBlockingTasks int

	// Cuando Nonblocking es verdadero, Pool.Submit nunca será bloqueado.
	// ErrPoolOverload será devuelto cuando no se pueda completar Pool.Submit de inmediato.
	// Cuando Nonblocking es verdadero, MaxBlockingTasks es inoperativo.
	Nonblocking bool

	// PanicHandler se utiliza para manejar panics de cada gorutina de trabajador.
	// si es nulo, los panics serán lanzados nuevamente desde las gorutinas de trabajador.
	PanicHandler func(interface{})

	// Logger es el registrador personalizado para registrar información, si no se establece,
	// se usa el registrador estándar predeterminado del paquete log.
	Logger Logger
}

// WithOptions acepta toda la configuración de opciones.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration establece el tiempo de intervalo para limpiar las gorutinas.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc indica si se debe asignar memoria para los trabajadores.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks establece el número máximo de gorutinas que están bloqueadas cuando se alcanza la capacidad del pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking indica que el pool devolverá nil cuando no haya trabajadores disponibles.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler configura el manejador de panic.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger configura un registrador personalizado.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

Al usar varias funciones opcionales al llamar a NewPool/NewPoolWithFunc, se pueden establecer los valores de cada elemento de configuración en ants.Options y luego utilizarlos para personalizar la piscina de gorutinas.

Piscina personalizada

ants permite instanciar la propia Piscina del usuario con una capacidad específica; llamando al método NewPool, se puede instanciar una nueva Piscina con la capacidad especificada de la siguiente manera:

p, _ := ants.NewPool(10000)

Envío de tareas

Las tareas se envían llamando al método ants.Submit(func()):

ants.Submit(func(){})

Ajuste dinámico de la capacidad de la piscina de gorutinas

Para ajustar dinámicamente la capacidad de la piscina de gorutinas, se puede utilizar el método Tune(int):

pool.Tune(1000) // Ajusta su capacidad a 1000
pool.Tune(100000) // Ajusta su capacidad a 100000

Este método es seguro para subprocesos.

Preasignar memoria a la cola de gorutinas

ants te permite preasignar memoria para toda la piscina, lo cual puede mejorar el rendimiento de la piscina de gorutinas en ciertos escenarios específicos. Por ejemplo, en un escenario donde se necesita una piscina con una capacidad muy grande y cada tarea dentro de la gorutina lleva tiempo, preasignar memoria para la cola de gorutinas reducirá la reasignación innecesaria de memoria.

// ants preasignará toda la capacidad de la piscina cuando invoques esta función
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Liberar la piscina

pool.Release()

Reiniciar la piscina

// Llamando al método Reboot(), puedes reactivar una piscina que haya sido previamente destruida y volver a ponerla en uso.
pool.Reboot()

Acerca del orden de ejecución de tareas

ants no garantiza el orden de ejecución de tareas, y el orden de ejecución no necesariamente corresponde al orden de envío. Esto se debe a que ants procesa todas las tareas enviadas de forma concurrente, y las tareas se asignarán a los trabajadores que se están ejecutando de forma concurrente, lo que resulta en la ejecución de tareas en un orden concurrente y no determinístico.