asynq Serverのタスク処理レート制限の実装例

このページでは、asynqサーバーを設定して、タスク処理のレート制限を実装する方法を示します。

このレート制限は各サーバーインスタンスのためのものであり、グローバルな制限ではありません。

この例では、レート制限をデモンストレーションするために golang.org/x/time/rate パッケージを使用します。サーバーの初期化設定で重要なのは IsFailureRetryDelayFunc です。 IsFailureRetryDelayFunc 関数でカスタムエラータイプを作成し、与えられたエラーを型アサートします。

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// エラーがレート制限によるものでない場合、エラーを失敗としてカウントしない。
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Rate limit reached (retry in %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// 1秒あたりのイベント制限を10に設定し、最大30のイベントを許可するバーストを可能にする。
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] タスク %s を処理中", task.Payload())
	return nil
}