Tunny - это библиотека на языке Golang для создания и управления пулами goroutine, которые позволяют ограничивать работу из любого количества goroutine с использованием синхронных API.

Когда ваша работа поступает из произвольного числа асинхронных источников, но ваша параллельная обработка ограничена, очень полезен фиксированный пул goroutine. Например, при обработке CPU-интенсивных задач HTTP-запросов, вы можете создать пул размером, соответствующим количеству процессоров.

Установка

go get github.com/Jeffail/tunny

В альтернативе, используя dep:

dep ensure -add github.com/Jeffail/tunny

Использование

В большинстве случаев вашу тяжелую работу можно представить простой func(), в этом случае вы можете использовать NewFunc. Давайте посмотрим, как использовать наш пример HTTP-запросов для подсчета CPU:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Выполните некоторые CPU-интенсивные операции, используя payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Импортируйте эту работу в наш пул. Этот вызов синхронный и заблокирует до завершения работы.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny также поддерживает тайм-ауты. Вы можете заменить вышеупомянутый вызов Process следующим кодом:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

Вы также можете использовать контекст запроса (или любой другой контекст) для обработки тайм-аутов и сроков выполнения. Просто замените вызов Process следующим кодом:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

Изменение размера пула

Вы можете использовать SetSize(int) для изменения размера пула Tunny в любое время.

pool.SetSize(10) // 10 goroutine
pool.SetSize(100) // 100 goroutine

Это безопасно, даже если другие goroutine все еще обрабатываются.

Состояние goroutine

Иногда каждая goroutine в пуле Tunny требует своего собственного управляющего состояния. В этом случае вы должны реализовать tunny.Worker, который включает вызовы для завершения, прерывания (если задача занимает слишком долго и больше не требуется) и блокирования выделения следующей задачи до тех пор, пока не будет выполнено определенное условие.

При создании пула с типом Worker вам необходимо предоставить конструктор для генерации вашей собственной реализации:

pool := tunny.New(poolSize, func() Worker {
	// TODO: Выполните выделение состояния для каждой goroutine здесь.
	return newCustomWorker()
})

Таким образом, Tunny может управлять созданием и уничтожением типа Worker при изменении размера пула.

Упорядочивание

Отложенные задачи не гарантируются обработкой в определенном порядке. Из-за текущей реализации каналов и блоков select, отложенные стеки задач будут обрабатываться как очередь FIFO. Однако это поведение не является частью спецификации и на него нельзя полагаться.