Beispiel zur Konfiguration des asynq-Servers zur Implementierung der Aufgabenverarbeitungs-Begrenzung

Diese Seite zeigt, wie der asynq-Server konfiguriert werden kann, um die Begrenzung der Aufgabenverarbeitung umzusetzen.

Bitte beachten Sie, dass dies die Begrenzung für jede Serverinstanz ist, nicht eine globale Begrenzung.

In diesem Beispiel verwenden wir das Paket golang.org/x/time/rate, um die Begrenzung der Verarbeitungsrate zu demonstrieren. Die Schlüsselkonfigurationen in den Einstellungen zur Serverinitialisierung sind IsFailure und RetryDelayFunc. Wir werden einen benutzerdefinierten Fehler-Typ erstellen und den gegebenen Fehler in den Funktionen IsFailure und RetryDelayFunc typisch überprüfen.

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"golang.org/x/time/rate"
	"github.com/hibiken/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{
			Concurrency:    10,
			// Zähle einen Fehler nicht als Scheitern, wenn er durch die Ratebegrenzung verursacht wird.
			IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
			RetryDelayFunc: retryDelay,
		},
	)

	if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
		log.Fatal(err)
	}
}

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("Rate-Limit erreicht (erneuter Versuch in %v)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
	_, ok := err.(*RateLimitError)
	return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
	var ratelimitErr *RateLimitError
	if errors.As(err, &ratelimitErr) {
		return ratelimitErr.RetryIn
	}
	return asynq.DefaultRetryDelayFunc(n, err, task)
}

// Rate-Limit von 10 Ereignissen pro Sekunde, erlaubt Spitzen von bis zu 30 Ereignissen.
var limiter = rate.NewLimiter(10, 30)

func handler(ctx context.Context, task *asynq.Task) error {
	if !limiter.Allow() {
		return &RateLimitError{
			RetryIn: time.Duration(rand.Intn(10)) * time.Second,
		}
	}
	log.Printf("[*] Bearbeitung der Aufgabe %s", task.Payload())
	return nil
}