Esempio di configurazione del server asynq per implementare il limite di velocità di elaborazione dei compiti

Questa pagina mostra come configurare il server asynq per implementare il limite di velocità di elaborazione dei compiti.

Si noti che si tratta del limite di velocità per ogni istanza del server, non un limite di velocità globale.

In questo esempio, utilizzeremo il pacchetto golang.org/x/time/rate per dimostrare il limite di velocità. Le configurazioni chiave nelle impostazioni di inizializzazione del server sono IsFailure e RetryDelayFunc. Creeremo un tipo di errore personalizzato e faremo l'assert del tipo di errore fornito nelle funzioni IsFailure e RetryDelayFunc.

pacchetto principale

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// Non considerare un errore come un fallimento se è dovuto al limite di velocità.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	se err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Limite di velocità raggiunto (riprova tra %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	restituisce asynq.DefaultRetryDelayFunc(n, err, task)
}

// Limite di velocità di 10 eventi al secondo, consentendo raffiche fino a 30 eventi.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Gestione del compito %s", task.Payload())
	return nil
}