Przykład konfiguracji serwera asynq do implementacji ograniczenia przetwarzania zadań

Ta strona pokazuje, jak skonfigurować serwer asynq w celu wprowadzenia ograniczenia przetwarzania zadań.

Zauważ, że jest to ograniczenie szybkości dla każdej instancji serwera, a nie globalne ograniczenie szybkości.

W tym przykładzie użyjemy pakietu golang.org/x/time/rate do demonstracji ograniczenia szybkości. Kluczowe konfiguracje w ustawieniach inicjalizacji serwera to IsFailure i RetryDelayFunc. Utworzymy niestandardowy typ błędu i sprawdzimy typ błędu w funkcjach IsFailure i RetryDelayFunc.

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// Do not count an error as a failure if it is due to rate limiting.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Osiągnięto limit szybkości (ponów za %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// Ograniczenie szybkości wynoszące 10 zdarzeń na sekundę, zezwalające na serie do 30 zdarzeń.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Obsługa zadania %s", task.Payload())
	return nil
}