Ejemplo de configuración del servidor asynq para implementar la limitación de velocidad de procesamiento de tareas

Esta página muestra cómo configurar el servidor asynq para implementar la limitación de velocidad de procesamiento de tareas.

Por favor, tenga en cuenta que esta es la limitación de velocidad para cada instancia del servidor, no una limitación de velocidad global.

En este ejemplo, utilizaremos el paquete golang.org/x/time/rate para demostrar la limitación de velocidad. Las configuraciones clave en la inicialización del servidor son IsFailure y RetryDelayFunc. Crearemos un tipo de error personalizado y realizaremos una aserción de tipo en el error dado en las funciones IsFailure y RetryDelayFunc.

paquete principal

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// No contar un error como un fallo si se debe a la limitación de velocidad.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Límite de velocidad alcanzado (reintentar en %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// Límite de velocidad de 10 eventos por segundo, permitiendo ráfagas de hasta 30 eventos.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Manejando la tarea %s", task.Payload())
	return nil
}