Ví dụ về Cấu hình máy chủ asynq để Thực hiện Giới hạn Tốc độ Xử lý Nhiệm vụ

Trang này sẽ hướng dẫn cách cấu hình máy chủ asynq để thực hiện giới hạn tốc độ xử lý nhiệm vụ.

Xin lưu ý rằng đây là giới hạn tốc độ cho mỗi bản sao máy chủ, không phải là giới hạn tốc độ toàn cầu.

Trong ví dụ này, chúng ta sẽ sử dụng gói golang.org/x/time/rate để minh họa việc giới hạn tốc độ. Các cấu hình quan trọng trong thiết lập khởi tạo máy chủ của bạn là IsFailureRetryDelayFunc. Chúng ta sẽ tạo một loại lỗi tùy chỉnh và ép kiểu lỗi cụ thể trong các hàm IsFailureRetryDelayFunc.

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// Không tính một lỗi là thất bại nếu nó do giới hạn tốc độ.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Đã đạt đến giới hạn tốc độ (thử lại sau %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// Giới hạn tốc độ là 10 sự kiện mỗi giây, cho phép tối đa 30 sự kiện trong một đợt.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Xử lý nhiệm vụ %s", task.Payload())
	return nil
}