Пример настройки сервера asynq для реализации ограничения скорости обработки задач

На этой странице показано, как настроить сервер asynq для реализации ограничения скорости обработки задач.

Обратите внимание, что это ограничение скорости для каждого экземпляра сервера, а не глобальное ограничение скорости.

В этом примере мы будем использовать пакет golang.org/x/time/rate для демонстрации ограничения скорости. Основные настройки при инициализации вашего сервера - это IsFailure и RetryDelayFunc. Мы создадим пользовательский тип ошибки и выполним утверждение типа данной ошибки в функциях IsFailure и RetryDelayFunc.

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// Не считать ошибку как неудачу, если это связано с ограничением скорости.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Достигнуто ограничение скорости (повтор через %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// Ограничение скорости 10 событий в секунду, позволяющее всплески до 30 событий.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Обработка задачи %s", task.Payload())
	return nil
}