asynq 서버 구성 예제: 작업 처리 속도 제한 구현

이 페이지에서는 asynq 서버를 구성하여 작업 처리 속도 제한을 구현하는 방법을 보여줍니다.

참고: 이는 각 서버 인스턴스의 속도 제한이며 전역 속도 제한이 아닙니다.

이 예제에서는 작업 처리 속도 제한을 보여주기 위해 golang.org/x/time/rate 패키지를 사용합니다. 서버 초기화 설정에서 주요 구성 요소는 IsFailureRetryDelayFunc입니다. IsFailureRetryDelayFunc 함수에서 사용자 정의 오류 유형을 생성하고 제공된 오류를 유형으로 단언합니다.

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// 작업 처리 속도 제한으로 인한 오류는 실패로 간주하지 않음.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("속도 제한 도달 (다음 시도까지 %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// 1초당 10개의 이벤트 속도 제한, 최대 30개의 버스트 허용
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] 작업 처리 중 %s", task.Payload())
	return nil
}