Пример настройки сервера asynq для реализации ограничения скорости обработки задач
На этой странице показано, как настроить сервер asynq для реализации ограничения скорости обработки задач.
Обратите внимание, что это ограничение скорости для каждого экземпляра сервера, а не глобальное ограничение скорости.
В этом примере мы будем использовать пакет golang.org/x/time/rate
для демонстрации ограничения скорости. Основные настройки при инициализации вашего сервера - это IsFailure
и RetryDelayFunc
. Мы создадим пользовательский тип ошибки и выполним утверждение типа данной ошибки в функциях IsFailure
и RetryDelayFunc
.
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"golang.org/x/time/rate"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{
Concurrency: 10,
// Не считать ошибку как неудачу, если это связано с ограничением скорости.
IsFailure: func(err error) bool { return !IsRateLimitError(err) },
RetryDelayFunc: retryDelay,
},
)
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
type RateLimitError struct {
RetryIn time.Duration
}
func (e *RateLimitError) Error() string {
return fmt.Sprintf("Достигнуто ограничение скорости (повтор через %v)", e.RetryIn)
}
func IsRateLimitError(err error) bool {
_, ok := err.(*RateLimitError)
return ok
}
func retryDelay(n int, err error, task *asynq.Task) time.Duration {
var ratelimitErr *RateLimitError
if errors.As(err, &ratelimitErr) {
return ratelimitErr.RetryIn
}
return asynq.DefaultRetryDelayFunc(n, err, task)
}
// Ограничение скорости 10 событий в секунду, позволяющее всплески до 30 событий.
var limiter = rate.NewLimiter(10, 30)
func handler(ctx context.Context, task *asynq.Task) error {
if !limiter.Allow() {
return &RateLimitError{
RetryIn: time.Duration(rand.Intn(10)) * time.Second,
}
}
log.Printf("[*] Обработка задачи %s", task.Payload())
return nil
}