Giới thiệu về Go ants

ants là một pool goroutine hiệu suất cao điều phối và quản lý một lượng lớn goroutines, cho phép giới hạn và tái sử dụng tài nguyên, từ đó đạt được hiệu suất thực hiện công việc hiệu quả hơn khi phát triển chương trình song song.

Tính năng

  • Tự động lên lịch một số lượng lớn goroutines và tái sử dụng chúng.
  • Thường xuyên dọn dẹp goroutines hết hạn để tiết kiệm tài nguyên hơn.
  • Cung cấp một số lượng lớn các giao diện hữu ích: nộp nhiệm vụ, lấy số lượng goroutines đang chạy, điều chỉnh kích thước pool động, giải phóng pool và khởi động lại pool.
  • Xử lý tinh tế các sự cố để ngăn chặn tiến trình bị treo.
  • Tái sử dụng tài nguyên giúp tiết kiệm bộ nhớ rất nhiều. Trong kịch bản nhiệm vụ song song hàng loạt quy mô lớn, có hiệu suất cao hơn so với song song goroutine gốc.
  • Cơ chế không chặn

Cách thức hoạt động của ants

Sơ đồ luồng

ants-flowchart-cn

Hình ảnh động

Cài đặt

Sử dụng ants phiên bản v1:

go get -u github.com/panjf2000/ants

Sử dụng ants phiên bản v2 (bật GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

Sử dụng

Khi viết một chương trình Go song song mà khởi chạy một số lượng lớn goroutines, điều này sẽ không tránh khỏi tốn tài nguyên hệ thống (bộ nhớ, CPU) một cách lớn. Bằng cách sử dụng ants, bạn có thể khởi tạo một pool goroutine để tái sử dụng goroutines, tiết kiệm tài nguyên và cải thiện hiệu suất:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("chạy với %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Xin chào Thế Giới!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Sử dụng pool thông thường.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("goroutines đang chạy: %d\n", ants.Running())
	fmt.Printf("hoàn thành tất cả nhiệm vụ.\n")

	// Sử dụng pool với một hàm,
	// đặt 10 cho dung lượng của pool goroutine và 1 giây cho thời gian hết hạn.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Nộp nhiệm vụ từng cái một.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("goroutines đang chạy: %d\n", p.Running())
	fmt.Printf("hoàn thành tất cả nhiệm vụ, kết quả là %d\n", sum)
}

Cấu hình Bể bơi

// Option đại diện cho hàm tùy chọn.
type Option func(opts *Options)

// Options chứa tất cả các tùy chọn sẽ được áp dụng khi khởi tạo một bể bơi ants.
type Options struct {
	// ExpiryDuration là khoảng thời gian cho rut ruot rác để dọn dẹp các công nhân đã hết hạn. 
	// rut ruot rác quét tất cả các công nhân mỗi `ExpiryDuration` và dọn dẹp các công nhân 
	// mà chưa được sử dụng hơn `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc chỉ ra xem có nên định trước bộ nhớ khi khởi tạo Pool hay không.
	PreAlloc bool

	// Số lượng tối đa của goroutine bị chặn trên pool.Submit.
	// 0 (giá trị mặc định) có nghĩa là không có giới hạn như vậy.
	MaxBlockingTasks int

	// Khi Nonblocking là true, Pool.Submit sẽ không bao giờ bị chặn.
	// ErrPoolOverload sẽ được trả về khi Pool.Submit không thể được thực hiện ngay lập tức.
	// Khi Nonblocking là true, MaxBlockingTasks không hoạt động.
	Nonblocking bool

	// PanicHandler được sử dụng để xử lý các sự cố từ mỗi goroutine công nhân.
	// Nếu nil, sự cố sẽ được ném ra từ goroutine công nhân.
	PanicHandler func(interface{})

	// Logger là bộ ghi tùy chỉnh để ghi thông tin, nếu nó không được đặt,
	// bộ ghi mặc định từ gói nhật ký sẽ được sử dụng.
	Logger Logger
}

// WithOptions chấp nhận toàn bộ cấu hình tùy chọn.
func WithOptions(options Options) Option {
	return func(opts *Options) {
		*opts = options
	}
}

// WithExpiryDuration thiết lập thời gian chu kỳ dọn dẹp các goroutine.
func WithExpiryDuration(expiryDuration time.Duration) Option {
	return func(opts *Options) {
		opts.ExpiryDuration = expiryDuration
	}
}

// WithPreAlloc cho biết liệu có nên cấp phát cho công nhân hay không.
func WithPreAlloc(preAlloc bool) Option {
	return func(opts *Options) {
		opts.PreAlloc = preAlloc
	}
}

// WithMaxBlockingTasks thiết lập số lượng tối đa của goroutine bị chặn khi nó đạt đến sức chứa của pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
	return func(opts *Options) {
		opts.MaxBlockingTasks = maxBlockingTasks
	}
}

// WithNonblocking cho biết rằng pool sẽ trả về nil khi không có công nhân khả dụng.
func WithNonblocking(nonblocking bool) Option {
	return func(opts *Options) {
		opts.Nonblocking = nonblocking
	}
}

// WithPanicHandler thiết lập bộ xử lý sự cố.
func WithPanicHandler(panicHandler func(interface{})) Option {
	return func(opts *Options) {
		opts.PanicHandler = panicHandler
	}
}

// WithLogger thiết lập một bộ ghi tùy chỉnh.
func WithLogger(logger Logger) Option {
	return func(opts *Options) {
		opts.Logger = logger
	}
}

Bằng cách sử dụng các hàm tùy chọn khác nhau khi gọi NewPool/NewPoolWithFunc, giá trị của mỗi mục cấu hình trong ants.Options có thể được thiết lập, sau đó được sử dụng để tùy chỉnh bể goroutine.

Bể bơi Tùy chỉnh

ants hỗ trợ khởi tạo Bể bơi của người dùng với một dung lượng bể bơi cụ thể; bằng cách gọi phương thức NewPool, một Bể bơi mới với dung lượng được chỉ định có thể được khởi tạo như sau:

p, _ := ants.NewPool(10000)

Nộp Nhiệm vụ

Nhiệm vụ được nộp bằng cách gọi phương thức ants.Submit(func()):

ants.Submit(func(){})

Điều chỉnh Động bộ của Dung lượng Bể goroutine

Để điều chỉnh động bộ của dung lượng bể goroutine, bạn có thể sử dụng phương thức Tune(int):

pool.Tune(1000) // Điều chỉnh sức chứa của nó thành 1000
pool.Tune(100000) // Điều chỉnh sức chứa của nó thành 100000

Phương thức này an toàn với luồng.

Định trước bộ nhớ cho hàng đợi goroutine

ants cho phép bạn định trước bộ nhớ cho toàn bộ bể bơi, điều này có thể cải thiện hiệu suất của bể goroutine trong một số tình huống cụ thể. Ví dụ, trong một tình huống cần một bể bơi với dung lượng rất lớn và mỗi nhiệm vụ trong goroutine mất thời gian, việc định trước bộ nhớ cho hàng đợi goroutine sẽ giảm thiểu việc cấp phát lại bộ nhớ không cần thiết.

// ants sẽ cấp phát trước toàn bộ dung lượng của bể bơi khi bạn gọi hàm này
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

Giải phóng Bể Bơi

pool.Release()

Khởi động lại Bể Bơi

// Bằng cách gọi phương thức Reboot(), bạn có thể kích hoạt lại một bể bơi đã bị hủy trước đó và đưa nó trở lại sử dụng.
pool.Reboot()

Về thứ tự thực hiện công việc

ants không đảm bảo thứ tự thực hiện công việc và thứ tự thực hiện không nhất thiết phải tương ứng với thứ tự gửi. Điều này xảy ra vì ants xử lý tất cả các công việc gửi cùng một lúc, và các công việc sẽ được giao cho các worker đang chạy đồng thời, dẫn đến việc thực hiện các công việc theo cách đồng thời và không xác định.