Go antsの概要

antsは高性能なゴルーチンプールであり、大量のゴルーチンのスケジューリングと管理を実装し、リソースの制限と再利用を可能にすることで、並行プログラムの開発時にタスクの効率的な実行を実現します。

特長

  • 大量のゴルーチンを自動的にスケジューリングし再利用する
  • 期限切れのゴルーチンを定期的にクリーンアップしてリソースを節約する
  • タスクの提出、実行中のゴルーチン数の取得、プールサイズの動的調整、プールの解放、プールの再起動など多くの有用なインターフェースを提供する
  • パニックをきちんと処理してプログラムのクラッシュを防止する
  • リソースの再利用によりメモリ使用量を大幅に節約する。大規模なバッチ並行タスクのシナリオでは、ネイティブゴルーチン並行より高いパフォーマンスを発揮する
  • ノンブロッキングメカニズム

antsの動作原理

フローチャート

ants-flowchart-cn

アニメーション画像

インストール

ants v1 バージョンを使用する場合:

go get -u github.com/panjf2000/ants

ants v2 バージョンを使用する場合(GO111MODULE=onに設定する必要があります):

go get -u github.com/panjf2000/ants/v2

使用法

大量のゴルーチンを起動するGo並行プログラムを記述する際には、多くのシステムリソース(メモリ、CPU)を消費することになります。antsを使用することで、ゴルーチンを再利用するためのゴルーチンプールをインスタンス化することができ、リソースを節約し、パフォーマンスを向上させることができます。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("%dで実行\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// 一般的なプールを使用する
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("実行中のゴルーチン数: %d\n", ants.Running())
	fmt.Println("すべてのタスクを完了")

	// 関数を持つプールを使用
	// ゴルーチンプールの容量を10に設定し、期限切れの時間を1秒に設定
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// タスクを1つずつ提出する
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("実行中のゴルーチン数: %d\n", p.Running())
	fmt.Printf("すべてのタスクを完了し、結果は %d です\n", sum)
}

プールの構成

// Optionはオプションの関数を表します。
type Option func(opts *Options)

// Optionsにはantsプールをインスタンス化する際に適用されるすべてのオプションが含まれています。
type Options struct {
	// ExpiryDurationは、scavengerゴールーチンが期限切れのワーカーをクリアするための期間です。
	// scavengerは全ワーカーを`ExpiryDuration`ごとにスキャンし、`ExpiryDuration`よりも使用されていないワーカーをクリアします。
	ExpiryDuration time.Duration

	// PreAllocは、プールを初期化する際にメモリの事前割り当てを行うかどうかを示します。
	PreAlloc bool

	// プール.Submitでブロックされるゴルーチンの最大数。
	// 0(デフォルト値)はそのような制限はありません。
	MaxBlockingTasks int

	// Nonblockingがtrueの場合、Pool.Submitは決してブロックされません。
	// Pool.Submitが一度に実行できない場合、ErrPoolOverloadが返されます。
	// Nonblockingがtrueの場合、MaxBlockingTasksは無効です。
	Nonblocking bool

	// PanicHandlerは各ワーカーゴルーチンからのパニックを処理するために使用されます。
	// nilの場合、パニックはワーカーゴルーチンから再度スローされます。
	PanicHandler func(interface{})

	// Loggerは情報をログに記録するためのカスタマイズされたロガーです。設定されていない場合、
	// logパッケージのデフォルト標準ロガーが使用されます。
	Logger Logger
}

// WithOptionsは全オプションの設定を受け付けます。
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDurationはゴルーチンをクリアする間隔時間を設定します。
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAllocはワーカーのためにメモリを割り当てるかどうかを示します。
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasksはプールの容量に達した際にブロックされるゴルーチンの最大数を設定します。
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblockingは利用可能なワーカーがない場合にプールがnilを返すことを示します。
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandlerはパニックハンドラを設定します。
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLoggerはカスタマイズされたロガーを設定します。
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

NewPool/NewPoolWithFuncを呼び出す際に、ants.Optionsの各構成項目の値を設定するためのさまざまなオプション関数を使用し、それらを使用してゴルーチンプールをカスタマイズできます。

カスタムプール

antsは特定のプール容量でユーザー独自のプールをインスタンス化することをサポートしており、NewPoolメソッドを呼び出すことで、次のように指定された容量で新しいプールをインスタンス化できます。

p, _ := ants.NewPool(10000)

タスクの提出

タスクはants.Submit(func())メソッドを呼び出すことで提出されます。

ants.Submit(func(){})

ゴルーチンプール容量の動的調整

ゴルーチンプールの容量を動的に調整するには、Tune(int)メソッドを使用できます。

pool.Tune(1000) // 容量を1000に調整
pool.Tune(100000) // 容量を100000に調整

このメソッドはスレッドセーフです。

ゴルーチンキューのメモリを事前に割り当てる

antsではプール全体のメモリを事前に割り当てることができ、これは特定のシナリオでゴルーチンプールのパフォーマンスを向上させることができます。たとえば、非常に大きな容量が必要なプールであり、各ゴルーチン内のタスクが時間がかかる場合、ゴルーチンキューのためのメモリを事前に割り当てることで不要なメモリ再割り当てを減らすことができます。

// この関数を呼び出すと、antsはプールの全体の容量を事前にmallocします
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

プールの解放

pool.Release()

プールの再起動

// Reboot()メソッドを呼び出すことで、以前に破棄されたプールを再アクティブ化して使用可能にすることができます。
pool.Reboot()

タスクの実行順について

ants はタスクの実行順を保証せず、実行順は必ずしも提出の順序に対応しません。これは、ants がすべての提出されたタスクを同時に処理し、タスクは同時に実行中のワーカーに割り当てられるため、タスクの実行が並行して非決定的な順序で行われるためです。