Tunny - это библиотека на языке Golang для создания и управления пулами goroutine, которые позволяют ограничивать работу из любого количества goroutine с использованием синхронных API.
Когда ваша работа поступает из произвольного числа асинхронных источников, но ваша параллельная обработка ограничена, очень полезен фиксированный пул goroutine. Например, при обработке CPU-интенсивных задач HTTP-запросов, вы можете создать пул размером, соответствующим количеству процессоров.
Установка
go get github.com/Jeffail/tunny
В альтернативе, используя dep:
dep ensure -add github.com/Jeffail/tunny
Использование
В большинстве случаев вашу тяжелую работу можно представить простой func()
, в этом случае вы можете использовать NewFunc
. Давайте посмотрим, как использовать наш пример HTTP-запросов для подсчета CPU:
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Выполните некоторые CPU-интенсивные операции, используя payload
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// Импортируйте эту работу в наш пул. Этот вызов синхронный и заблокирует до завершения работы.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
Tunny также поддерживает тайм-ауты. Вы можете заменить вышеупомянутый вызов Process
следующим кодом:
result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
Вы также можете использовать контекст запроса (или любой другой контекст) для обработки тайм-аутов и сроков выполнения. Просто замените вызов Process
следующим кодом:
result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
Изменение размера пула
Вы можете использовать SetSize(int)
для изменения размера пула Tunny в любое время.
pool.SetSize(10) // 10 goroutine
pool.SetSize(100) // 100 goroutine
Это безопасно, даже если другие goroutine все еще обрабатываются.
Состояние goroutine
Иногда каждая goroutine в пуле Tunny требует своего собственного управляющего состояния. В этом случае вы должны реализовать tunny.Worker
, который включает вызовы для завершения, прерывания (если задача занимает слишком долго и больше не требуется) и блокирования выделения следующей задачи до тех пор, пока не будет выполнено определенное условие.
При создании пула с типом Worker
вам необходимо предоставить конструктор для генерации вашей собственной реализации:
pool := tunny.New(poolSize, func() Worker {
// TODO: Выполните выделение состояния для каждой goroutine здесь.
return newCustomWorker()
})
Таким образом, Tunny может управлять созданием и уничтожением типа Worker
при изменении размера пула.
Упорядочивание
Отложенные задачи не гарантируются обработкой в определенном порядке. Из-за текущей реализации каналов и блоков select
, отложенные стеки задач будут обрабатываться как очередь FIFO. Однако это поведение не является частью спецификации и на него нельзя полагаться.