Введение в Go ants

ants - это высокопроизводительный пул горутин, который реализует планирование и управление большим количеством горутин, позволяя ограничивать и повторно использовать ресурсы, тем самым достигая более эффективного выполнения задач при разработке параллельных программ.

Особенности

  • Автоматическое планирование большого количества горутин и их повторное использование.
  • Регулярная очистка устаревших горутин для дальнейшей экономии ресурсов.
  • Предоставляет большое количество полезных интерфейсов: представление задачи, получение количества выполняющихся горутин, динамическое изменение размера пула, освобождение пула и перезапуск пула.
  • Элегантно обрабатывает паники, чтобы предотвратить сбои программы.
  • Повторное использование ресурсов значительно экономит использование памяти. В случае массовых параллельных задач он имеет более высокую производительность, чем нативная параллельность горутин.
  • Неблокирующий механизм

Как работает ants

Блок-схема

ants-flowchart-cn

Анимационные изображения

Установка

Использование версии ants v1:

go get -u github.com/panjf2000/ants

Использование версии ants v2 (включить GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

Использование

При написании параллельной программы на Go, запускающей большое количество горутин, неизбежно потребуется большое количество системных ресурсов (память, процессор). Используя ants, вы можете создать пул горутин для повторного использования и экономии ресурсов, что улучшит производительность:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("выполняется с %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Привет, мир!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Использование общего пула.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("выполняющиеся горутины: %d\n", ants.Running())
	fmt.Printf("все задачи выполнены.\n")

	// Использование пула с функцией,
	// установить емкость пула горутин равной 10 и длительность устаревания равной 1 секунде.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Представление задач поочередно.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("выполняющиеся горутины: %d\n", p.Running())
	fmt.Printf("все задачи выполнены, результат: %d\n", sum)
}

Конфигурация пула

// Option представляет собой необязательную функцию.
type Option func(opts *Options)

// Options содержит все опции, которые будут применены при создании пула ants.
type Options struct {
	// ExpiryDuration - период для обслуживающей горутины, чтобы очистить устаревшие рабочие,
	// обслуживающая горутина сканирует всех рабочих каждые `ExpiryDuration` и удаляет тех рабочих, которые не использовались
	// более `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc указывает, следует ли выполнить предварительное выделение памяти при инициализации Pool.
	PreAlloc bool

	// Max - максимальное количество блокирующих горутин на pool.Submit.
	// 0 (значение по умолчанию) означает, что нет такого ограничения.
	MaxBlockingTasks int

	// Когда Nonblocking истинно, Pool.Submit никогда не будет заблокирован.
	// ErrPoolOverload будет возвращен, когда Pool.Submit не может быть выполнен сразу.
	// Когда Nonblocking истинно, MaxBlockingTasks не работает.
	Nonblocking bool

	// PanicHandler используется для обработки паник каждой рабочей горутины.
	// если nil, то паника будет снова выброшена из рабочих горутин.
	PanicHandler func(interface{})

	// Logger - настраиваемый регистратор для логирования информации, если он не установлен,
	// используется регистратор по умолчанию из пакета log.
	Logger Logger
}

// WithOptions принимает всю конфигурацию опций.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration устанавливает интервал времени очистки горутин.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc указывает, следует ли выделять память для рабочих.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks устанавливает максимальное количество горутин, которые блокируются при достижении предела пула.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking указывает, что пул будет возвращать nil, когда доступных рабочих нет.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler устанавливает обработчик паник.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger устанавливает настраиваемый регистратор.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

Путем использования различных необязательных функций при вызове NewPool/NewPoolWithFunc, можно установить значения каждого элемента конфигурации в ants.Options, а затем использовать их для настройки пула горутин.

Настраиваемый пул

ants поддерживает создание собственного пула с заданной вместимостью; вызывая метод NewPool, можно создать новый пул с указанной вместимостью следующим образом:

p, _ := ants.NewPool(10000)

Представление задач

Задачи представляют собой вызов метода ants.Submit(func()):

ants.Submit(func(){}) 

Динамическое настройка вместимости пула горутин

Для динамической настройки вместимости пула горутин можно использовать метод Tune(int):

pool.Tune(1000) // Настроить вместимость на 1000
pool.Tune(100000) // Настроить вместимость на 100000

Этот метод поддерживает многопоточность.

Предварительное выделение памяти для очереди горутин

ants позволяет предварительно выделять память для всего пула, что может улучшить производительность пула горутин в определенных сценариях. Например, в ситуации, когда требуется пул с очень большой вместимостью, и каждая задача в горутине занимает много времени, предварительное выделение памяти для очереди горутин уменьшит ненужное перераспределение памяти.

//  при вызове этой функции ants предварительно выделяет память на всю вместимость пула
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Освобождение пула

pool.Release()

Перезагрузка пула

// Вызовом метода Reboot() можно повторно активировать ранее уничтоженный пул и вернуть его в использование.
pool.Reboot()

О порядке выполнения задач

ants не гарантирует порядок выполнения задач, и порядок выполнения не обязательно соответствует порядку отправки. Это происходит потому, что ants обрабатывает все отправленные задачи параллельно, и задачи будут назначены работникам, которые параллельно выполняются, что приводит к выполнению задач в одновременном и недетерминированном порядке.