Введение в Go ants
ants
- это высокопроизводительный пул горутин, который реализует планирование и управление большим количеством горутин, позволяя ограничивать и повторно использовать ресурсы, тем самым достигая более эффективного выполнения задач при разработке параллельных программ.
Особенности
- Автоматическое планирование большого количества горутин и их повторное использование.
- Регулярная очистка устаревших горутин для дальнейшей экономии ресурсов.
- Предоставляет большое количество полезных интерфейсов: представление задачи, получение количества выполняющихся горутин, динамическое изменение размера пула, освобождение пула и перезапуск пула.
- Элегантно обрабатывает паники, чтобы предотвратить сбои программы.
- Повторное использование ресурсов значительно экономит использование памяти. В случае массовых параллельных задач он имеет более высокую производительность, чем нативная параллельность горутин.
- Неблокирующий механизм
Как работает ants
Блок-схема
Анимационные изображения
Установка
Использование версии ants
v1:
go get -u github.com/panjf2000/ants
Использование версии ants
v2 (включить GO111MODULE=on):
go get -u github.com/panjf2000/ants/v2
Использование
При написании параллельной программы на Go, запускающей большое количество горутин, неизбежно потребуется большое количество системных ресурсов (память, процессор). Используя ants
, вы можете создать пул горутин для повторного использования и экономии ресурсов, что улучшит производительность:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("выполняется с %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Привет, мир!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Использование общего пула.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("выполняющиеся горутины: %d\n", ants.Running())
fmt.Printf("все задачи выполнены.\n")
// Использование пула с функцией,
// установить емкость пула горутин равной 10 и длительность устаревания равной 1 секунде.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Представление задач поочередно.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("выполняющиеся горутины: %d\n", p.Running())
fmt.Printf("все задачи выполнены, результат: %d\n", sum)
}
Конфигурация пула
// Option представляет собой необязательную функцию.
type Option func(opts *Options)
// Options содержит все опции, которые будут применены при создании пула ants.
type Options struct {
// ExpiryDuration - период для обслуживающей горутины, чтобы очистить устаревшие рабочие,
// обслуживающая горутина сканирует всех рабочих каждые `ExpiryDuration` и удаляет тех рабочих, которые не использовались
// более `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc указывает, следует ли выполнить предварительное выделение памяти при инициализации Pool.
PreAlloc bool
// Max - максимальное количество блокирующих горутин на pool.Submit.
// 0 (значение по умолчанию) означает, что нет такого ограничения.
MaxBlockingTasks int
// Когда Nonblocking истинно, Pool.Submit никогда не будет заблокирован.
// ErrPoolOverload будет возвращен, когда Pool.Submit не может быть выполнен сразу.
// Когда Nonblocking истинно, MaxBlockingTasks не работает.
Nonblocking bool
// PanicHandler используется для обработки паник каждой рабочей горутины.
// если nil, то паника будет снова выброшена из рабочих горутин.
PanicHandler func(interface{})
// Logger - настраиваемый регистратор для логирования информации, если он не установлен,
// используется регистратор по умолчанию из пакета log.
Logger Logger
}
// WithOptions принимает всю конфигурацию опций.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration устанавливает интервал времени очистки горутин.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc указывает, следует ли выделять память для рабочих.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks устанавливает максимальное количество горутин, которые блокируются при достижении предела пула.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking указывает, что пул будет возвращать nil, когда доступных рабочих нет.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler устанавливает обработчик паник.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// WithLogger устанавливает настраиваемый регистратор.
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
Путем использования различных необязательных функций при вызове NewPool
/NewPoolWithFunc
, можно установить значения каждого элемента конфигурации в ants.Options
, а затем использовать их для настройки пула горутин.
Настраиваемый пул
ants
поддерживает создание собственного пула с заданной вместимостью; вызывая метод NewPool
, можно создать новый пул с указанной вместимостью следующим образом:
p, _ := ants.NewPool(10000)
Представление задач
Задачи представляют собой вызов метода ants.Submit(func())
:
ants.Submit(func(){})
Динамическое настройка вместимости пула горутин
Для динамической настройки вместимости пула горутин можно использовать метод Tune(int)
:
pool.Tune(1000) // Настроить вместимость на 1000
pool.Tune(100000) // Настроить вместимость на 100000
Этот метод поддерживает многопоточность.
Предварительное выделение памяти для очереди горутин
ants
позволяет предварительно выделять память для всего пула, что может улучшить производительность пула горутин в определенных сценариях. Например, в ситуации, когда требуется пул с очень большой вместимостью, и каждая задача в горутине занимает много времени, предварительное выделение памяти для очереди горутин уменьшит ненужное перераспределение памяти.
// при вызове этой функции ants предварительно выделяет память на всю вместимость пула
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
Освобождение пула
pool.Release()
Перезагрузка пула
// Вызовом метода Reboot() можно повторно активировать ранее уничтоженный пул и вернуть его в использование.
pool.Reboot()
О порядке выполнения задач
ants
не гарантирует порядок выполнения задач, и порядок выполнения не обязательно соответствует порядку отправки. Это происходит потому, что ants
обрабатывает все отправленные задачи параллельно, и задачи будут назначены работникам, которые параллельно выполняются, что приводит к выполнению задач в одновременном и недетерминированном порядке.