Diese Seite stellt das Feature der Aufgabenaggregation von Asynq vor.

Übersicht

Die Aufgabenaggregation ermöglicht es Ihnen, mehrere Aufgaben in Folge in die Warteschlange zu stellen, anstatt sie einzeln an einen “Handler” zu übergeben. Mit diesem Feature können Sie mehrere aufeinanderfolgende Operationen zu einer zusammenfassen, Kosten sparen, die Zwischenspeicherung optimieren oder Benachrichtigungen bündeln.

Funktionsprinzip

Um das Feature der Aufgabenaggregation zu verwenden, müssen Sie Aufgaben mit demselben Gruppennamen in dieselbe Warteschlange stellen. Die mit demselben (Warteschlange, Gruppe)-Paar eingereihten Aufgaben werden von dem von Ihnen bereitgestellten GroupAggregator zu einer Aufgabe zusammengefasst und dann an den Handler übergeben.

Beim Erstellen einer zusammengefassten Aufgabe wartet der Asynq-Server auf weitere Aufgaben, bis die konfigurierbare Gnadenfrist abgelaufen ist. Jedes Mal, wenn eine neue Aufgabe mit demselben (Warteschlange, Gruppe)-Paar eingereiht wird, wird die Gnadenfrist aktualisiert.

Die Gnadenfrist hat eine konfigurierbare Obergrenze: Sie können die maximale Aggregationsverzögerungszeit festlegen, nach der der Asynq-Server die verbleibende Gnadenfrist ignorieren und die Aufgaben zusammenfassen wird.

Sie können auch die maximale Anzahl von Aufgaben festlegen, die zusammengefasst werden können. Wenn diese Anzahl erreicht ist, aggregiert der Asynq-Server die Aufgaben unverzüglich.

Hinweis: Die Planung und Aggregation von Aufgaben sind sich widersprechende Features, wobei die Planung Vorrang vor der Aggregation hat.

Schnelles Beispiel

Auf der Client-Seite verwenden Sie die Queue- und Group-Optionen, um Tasks in derselben Gruppe in die Warteschlange zu stellen.

// Fügen Sie drei Tasks derselben Gruppe hinzu.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Auf der Serverseite bieten Sie einen GroupAggregator an, um die Task-Aggregation zu ermöglichen. Sie können die Aggregationsstrategie anpassen, indem Sie GroupGracePeriod, GroupMaxDelay und GroupMaxSize konfigurieren.

// Diese Funktion wird verwendet, um mehrere Tasks in einen Task zu aggregieren.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Ihre Logik, um die gegebenen Tasks zu aggregieren und den aggregierten Task zurückzugeben.
    // ... Verwenden Sie bei Bedarf NewTask(typename, payload, opts...), um einen neuen Task zu erstellen und Optionen festzulegen.
    // ... (Hinweis) Die Queue-Option wird ignoriert, und der aggregierte Task wird immer in dieselbe Warteschlange wie die Gruppe eingereiht.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Tutorial

In diesem Abschnitt stellen wir ein einfaches Programm zur Verfügung, um die Verwendung der Aggregationsfunktion zu demonstrieren.

Erstellen Sie zunächst ein Client-Programm mit dem folgenden Code:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adresse des Redis-Servers")
       flagMessage = flag.String("message", "hallo", "Nachricht, die gedruckt werden soll, wenn der Task bearbeitet wird")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Fehler beim Einreihen des Tasks: %v", err)
        }
        log.Printf("Task erfolgreich in die Warteschlange eingereiht: %s", info.ID)
}

Sie können dieses Programm mehrmals ausführen:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Wenn Sie jetzt die Warteschlange über die Befehlszeile oder die Web-Benutzeroberfläche überprüfen, werden Sie feststellen, dass Tasks in der Warteschlange aggregiert werden.

Erstellen Sie als nächstes ein Serverprogramm mit folgendem Code:

// server.go
package main

import (
    "context"
    "flag"
    "log"
    "strings"
    "time"

    "github.com/hibiken/asynq"
)

var (
    flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Adresse des Redis-Servers")
    flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Gnadenfrist für Gruppen")
    flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Maximale Verzögerung für Gruppen")
    flagGroupMaxSize      = flag.Int("max-size", 20, "Maximale Größe für Gruppen")
)

// Einfache Aggregationsfunktion.
// Kombiniert Nachrichten aller Aufgaben zu einer einzigen, wobei jede Nachricht eine Zeile einnimmt.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    log.Printf("Aggregierte %d Aufgaben aus der Gruppe %q", len(tasks), group)
    var b strings.Builder
    for _, t := range tasks {
        b.Write(t.Payload())
        b.WriteString("\n")
    }
    return asynq.NewTask("aggregierte-aufgabe", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
    log.Print("Handler hat die aggregierte Aufgabe erhalten")
    log.Printf("Aggregierte Nachricht: %s", task.Payload())
    return nil
}

func main() {
    flag.Parse()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: *flagRedisAddr},
        asynq.Config{
            Queues:           map[string]int{"tutorial": 1},
            GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
            GroupGracePeriod: *flagGroupGracePeriod,
            GroupMaxDelay:    *flagGroupMaxDelay,
            GroupMaxSize:     *flagGroupMaxSize,
        },
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("aggregierte-aufgabe", handleAggregatedTask)

    if err := srv.Run(mux); err != nil {
        log.Fatalf("Fehler beim Start des Servers: %v", err)
    }
}

Sie können dieses Programm ausführen und die Ausgabe beobachten:

$ go build -o server server.go
$ ./server --redis-addr=

Sie sollten in der Ausgabe sehen können, dass der Server Aufgaben in der Gruppe aggregiert hat und der Prozessor die aggregierten Aufgaben verarbeitet hat. Probieren Sie gerne aus, die Flags --grace-period, --max-delay und --max-size im obigen Programm zu ändern, um zu sehen, wie sie die Aggregationsstrategie beeinflussen.