Wprowadzenie do Go ants
ants
to wysokowydajny pulpit goroutine, który implementuje harmonogramowanie i zarządzanie dużą liczbą gorutyn, umożliwiając ograniczenie i ponowne wykorzystanie zasobów, co pozwala na bardziej efektywne wykonywanie zadań podczas programowania współbieżnego.
Funkcje
- Automatyczne harmonogramowanie ogromnej liczby gorutyn i ich ponowne wykorzystanie.
- Regularne usuwanie przeterminowanych gorutyn, aby dalej oszczędzać zasoby.
- Zapewnia dużą liczbę przydatnych interfejsów: przesyłanie zadań, uzyskiwanie liczby uruchomionych gorutyn, dynamiczna zmiana rozmiaru puli, zwalnianie puli oraz ponowne uruchamianie puli.
- Grzeczne obsługiwanie panik w celu zapobiegania awariom programu.
- Ponowne wykorzystywanie zasobów znacznie oszczędza użycie pamięci. W scenariuszu dużych wsadowych zadań współbieżnych ma wyższą wydajność niż natywna współbieżność gorutyn.
- Mechanizm nieblokujący
Jak działa ants
Schemat przepływu
Obrazki animowane
Instalacja
Korzystanie z wersji ants
v1:
go get -u github.com/panjf2000/ants
Korzystanie z wersji ants
v2 (włącz GO111MODULE=on):
go get -u github.com/panjf2000/ants/v2
Użycie
Podczas pisania programu współbieżnego Go, który uruchamia dużą liczbę gorutyn, nieuchronnie zużywa dużo zasobów systemowych (pamięć, CPU). Korzystając z ants
, możesz utworzyć pulę goroutine do ponownego wykorzystywania gorutyn, oszczędzając zasoby i poprawiając wydajność:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("uruchomione z %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Witaj, świecie!")
}
func main() {
defer ants.Release()
liczbaRazy := 1000
// Użyj wspólnej puli.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < liczbaRazy; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("uruchomione gorutyny: %d\n", ants.Running())
fmt.Printf("ukończono wszystkie zadania.\n")
// Użyj puli z funkcją,
// ustaw 10 jako pojemność puli goroutine oraz 1 sekundę na czas wygaśnięcia.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Prześlij zadania po jednym.
for i := 0; i < liczbaRazy; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("uruchomione gorutyny: %d\n", p.Running())
fmt.Printf("ukończono wszystkie zadania, wynik to %d\n", sum)
}
Konfiguracja basenu
// Opcja reprezentuje funkcję opcjonalną.
type Option func(opts *Options)
// Opcje zawierają wszystkie opcje, które zostaną zastosowane podczas tworzenia basenu mrówek.
type Options struct {
// ExpiryDuration to okres dla wątku sprzątającego do sprzątania przeterminowanych pracowników,
// sprzątacz skanuje wszystkich pracowników co `ExpiryDuration` i sprząta tych pracowników, którzy nie byli
// używani przez więcej niż `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc wskazuje, czy dokonać prealokacji pamięci podczas inicjowania basenu.
PreAlloc bool
// Maksymalna liczba wątków blokujących się w pool.Submit.
// 0 (wartość domyślna) oznacza brak takiego ograniczenia.
MaxBlockingTasks int
// Gdy Nonblocking jest true, Pool.Submit nigdy nie będzie blokowany.
// ErrPoolOverload zostanie zwrócony, gdy Pool.Submit nie może zostać wykonany od razu.
// Gdy Nonblocking jest true, MaxBlockingTasks jest nieważne.
Nonblocking bool
// PanicHandler jest używany do obsługi panik z każdej rutyny pracowniczej.
// jeśli jest nil, paniki zostaną ponownie wyrzucone z rutyn pracowniczych.
PanicHandler func(interface{})
// Logger to dostosowany rejestrator do rejestrowania informacji, jeśli nie jest ustawiony,
// domyślny standardowy rejestrator z pakietu log jest używany.
Logger Logger
}
// WithOptions akceptuje pełną konfigurację opcji.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration ustawia czas interwału czyszczenia wątków.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc wskazuje, czy należy zaalokować pamięć dla pracowników.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks ustawia maksymalną liczbę zablokowanych wątków, gdy osiągnie pojemność basenu.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking wskazuje, że basen zwróci nil, gdy nie ma dostępnych pracowników.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler ustawia obsługę paniki.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// WithLogger ustawia dostosowany rejestrator.
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
Za pomocą różnych funkcji opcjonalnych podczas wywoływania NewPool
/NewPoolWithFunc
, wartości poszczególnych elementów konfiguracji w ants.Options
mogą zostać ustawione, a następnie użyte do dostosowania basenu wątków.
Niestandardowy basen
ants
obsługuje tworzenie niestandardowego basenu użytkownika z określoną pojemnością basenu; przez wywołanie metody NewPool
, nowy basen o określonej pojemności można utworzyć w następujący sposób:
p, _ := ants.NewPool(10000)
Przesyłanie zadania
Zadania są przesyłane przez wywołanie metody ants.Submit(func())
:
ants.Submit(func(){})
Dynamiczna regulacja pojemności basenu wątków
Aby dynamicznie dostosować pojemność basenu wątków, można użyć metody Tune(int)
:
pool.Tune(1000) // Dostosuj jego pojemność do 1000
pool.Tune(100000) // Dostosuj jego pojemność do 100000
Ta metoda jest bezpieczna wątkowo.
Prealokacja pamięci dla kolejki wątków
ants
pozwala na wstępne zaalokowanie pamięci dla całego basenu, co może poprawić wydajność basenu wątków w pewnych konkretnych scenariuszach. Na przykład, w scenariuszu, gdzie konieczny jest basen o bardzo dużej pojemności i każde zadanie wewnątrz wątku jest czasochłonne, wstępna alokacja pamięci dla kolejki wątków zmniejszy niepotrzebną realokację pamięci.
// ants wstępnie zaalokuje całą pojemność basenu, gdy wywołasz tę funkcję
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
Zwolnienie basenu
pool.Release()
Ponowne uruchomienie basenu
// Poprzez wywołanie metody Reboot() można ponownie aktywować basen, który był wcześniej zniszczony i ponownie go wykorzystać.
pool.Reboot()
O kolejności wykonywania zadań
ants
nie gwarantuje kolejności wykonania zadań, a kolejność wykonania niekoniecznie odpowiada kolejności zgłoszenia. Dotyczy to dlatego, że ants
przetwarza wszystkie zgłoszone zadania równocześnie, a zadania zostaną przypisane do równocześnie działających workerów, co skutkuje wykonywaniem zadań w równoczesnej i niedeterministycznej kolejności.