Wprowadzenie do Go ants

ants to wysokowydajny pulpit goroutine, który implementuje harmonogramowanie i zarządzanie dużą liczbą gorutyn, umożliwiając ograniczenie i ponowne wykorzystanie zasobów, co pozwala na bardziej efektywne wykonywanie zadań podczas programowania współbieżnego.

Funkcje

  • Automatyczne harmonogramowanie ogromnej liczby gorutyn i ich ponowne wykorzystanie.
  • Regularne usuwanie przeterminowanych gorutyn, aby dalej oszczędzać zasoby.
  • Zapewnia dużą liczbę przydatnych interfejsów: przesyłanie zadań, uzyskiwanie liczby uruchomionych gorutyn, dynamiczna zmiana rozmiaru puli, zwalnianie puli oraz ponowne uruchamianie puli.
  • Grzeczne obsługiwanie panik w celu zapobiegania awariom programu.
  • Ponowne wykorzystywanie zasobów znacznie oszczędza użycie pamięci. W scenariuszu dużych wsadowych zadań współbieżnych ma wyższą wydajność niż natywna współbieżność gorutyn.
  • Mechanizm nieblokujący

Jak działa ants

Schemat przepływu

ants-flowchart-cn

Obrazki animowane

Instalacja

Korzystanie z wersji ants v1:

go get -u github.com/panjf2000/ants

Korzystanie z wersji ants v2 (włącz GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

Użycie

Podczas pisania programu współbieżnego Go, który uruchamia dużą liczbę gorutyn, nieuchronnie zużywa dużo zasobów systemowych (pamięć, CPU). Korzystając z ants, możesz utworzyć pulę goroutine do ponownego wykorzystywania gorutyn, oszczędzając zasoby i poprawiając wydajność:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("uruchomione z %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Witaj, świecie!")
}

func main() {
	defer ants.Release()

	liczbaRazy := 1000

	// Użyj wspólnej puli.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < liczbaRazy; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("uruchomione gorutyny: %d\n", ants.Running())
	fmt.Printf("ukończono wszystkie zadania.\n")

	// Użyj puli z funkcją,
	// ustaw 10 jako pojemność puli goroutine oraz 1 sekundę na czas wygaśnięcia.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Prześlij zadania po jednym.
	for i := 0; i < liczbaRazy; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("uruchomione gorutyny: %d\n", p.Running())
	fmt.Printf("ukończono wszystkie zadania, wynik to %d\n", sum)
}

Konfiguracja basenu

// Opcja reprezentuje funkcję opcjonalną.
type Option func(opts *Options)

// Opcje zawierają wszystkie opcje, które zostaną zastosowane podczas tworzenia basenu mrówek.
type Options struct {
	// ExpiryDuration to okres dla wątku sprzątającego do sprzątania przeterminowanych pracowników,
	// sprzątacz skanuje wszystkich pracowników co `ExpiryDuration` i sprząta tych pracowników, którzy nie byli
	// używani przez więcej niż `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc wskazuje, czy dokonać prealokacji pamięci podczas inicjowania basenu.
	PreAlloc bool

	// Maksymalna liczba wątków blokujących się w pool.Submit.
	// 0 (wartość domyślna) oznacza brak takiego ograniczenia.
	MaxBlockingTasks int

	// Gdy Nonblocking jest true, Pool.Submit nigdy nie będzie blokowany.
	// ErrPoolOverload zostanie zwrócony, gdy Pool.Submit nie może zostać wykonany od razu.
	// Gdy Nonblocking jest true, MaxBlockingTasks jest nieważne.
	Nonblocking bool

	// PanicHandler jest używany do obsługi panik z każdej rutyny pracowniczej.
	// jeśli jest nil, paniki zostaną ponownie wyrzucone z rutyn pracowniczych.
	PanicHandler func(interface{})

	// Logger to dostosowany rejestrator do rejestrowania informacji, jeśli nie jest ustawiony,
	// domyślny standardowy rejestrator z pakietu log jest używany.
	Logger Logger
}

// WithOptions akceptuje pełną konfigurację opcji.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration ustawia czas interwału czyszczenia wątków.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc wskazuje, czy należy zaalokować pamięć dla pracowników.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks ustawia maksymalną liczbę zablokowanych wątków, gdy osiągnie pojemność basenu.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking wskazuje, że basen zwróci nil, gdy nie ma dostępnych pracowników.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler ustawia obsługę paniki.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger ustawia dostosowany rejestrator.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

Za pomocą różnych funkcji opcjonalnych podczas wywoływania NewPool/NewPoolWithFunc, wartości poszczególnych elementów konfiguracji w ants.Options mogą zostać ustawione, a następnie użyte do dostosowania basenu wątków.

Niestandardowy basen

ants obsługuje tworzenie niestandardowego basenu użytkownika z określoną pojemnością basenu; przez wywołanie metody NewPool, nowy basen o określonej pojemności można utworzyć w następujący sposób:

p, _ := ants.NewPool(10000)

Przesyłanie zadania

Zadania są przesyłane przez wywołanie metody ants.Submit(func()):

ants.Submit(func(){})

Dynamiczna regulacja pojemności basenu wątków

Aby dynamicznie dostosować pojemność basenu wątków, można użyć metody Tune(int):

pool.Tune(1000) // Dostosuj jego pojemność do 1000
pool.Tune(100000) // Dostosuj jego pojemność do 100000

Ta metoda jest bezpieczna wątkowo.

Prealokacja pamięci dla kolejki wątków

ants pozwala na wstępne zaalokowanie pamięci dla całego basenu, co może poprawić wydajność basenu wątków w pewnych konkretnych scenariuszach. Na przykład, w scenariuszu, gdzie konieczny jest basen o bardzo dużej pojemności i każde zadanie wewnątrz wątku jest czasochłonne, wstępna alokacja pamięci dla kolejki wątków zmniejszy niepotrzebną realokację pamięci.

// ants wstępnie zaalokuje całą pojemność basenu, gdy wywołasz tę funkcję
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Zwolnienie basenu

pool.Release()

Ponowne uruchomienie basenu

// Poprzez wywołanie metody Reboot() można ponownie aktywować basen, który był wcześniej zniszczony i ponownie go wykorzystać.
pool.Reboot()

O kolejności wykonywania zadań

ants nie gwarantuje kolejności wykonania zadań, a kolejność wykonania niekoniecznie odpowiada kolejności zgłoszenia. Dotyczy to dlatego, że ants przetwarza wszystkie zgłoszone zadania równocześnie, a zadania zostaną przypisane do równocześnie działających workerów, co skutkuje wykonywaniem zadań w równoczesnej i niedeterministycznej kolejności.