Diese Seite stellt das Feature der Aufgabenaggregation von Asynq vor.
Übersicht
Die Aufgabenaggregation ermöglicht es Ihnen, mehrere Aufgaben in Folge in die Warteschlange zu stellen, anstatt sie einzeln an einen “Handler” zu übergeben. Mit diesem Feature können Sie mehrere aufeinanderfolgende Operationen zu einer zusammenfassen, Kosten sparen, die Zwischenspeicherung optimieren oder Benachrichtigungen bündeln.
Funktionsprinzip
Um das Feature der Aufgabenaggregation zu verwenden, müssen Sie Aufgaben mit demselben Gruppennamen in dieselbe Warteschlange stellen. Die mit demselben (Warteschlange, Gruppe)
-Paar eingereihten Aufgaben werden von dem von Ihnen bereitgestellten GroupAggregator
zu einer Aufgabe zusammengefasst und dann an den Handler übergeben.
Beim Erstellen einer zusammengefassten Aufgabe wartet der Asynq-Server auf weitere Aufgaben, bis die konfigurierbare Gnadenfrist abgelaufen ist. Jedes Mal, wenn eine neue Aufgabe mit demselben (Warteschlange, Gruppe)
-Paar eingereiht wird, wird die Gnadenfrist aktualisiert.
Die Gnadenfrist hat eine konfigurierbare Obergrenze: Sie können die maximale Aggregationsverzögerungszeit festlegen, nach der der Asynq-Server die verbleibende Gnadenfrist ignorieren und die Aufgaben zusammenfassen wird.
Sie können auch die maximale Anzahl von Aufgaben festlegen, die zusammengefasst werden können. Wenn diese Anzahl erreicht ist, aggregiert der Asynq-Server die Aufgaben unverzüglich.
Hinweis: Die Planung und Aggregation von Aufgaben sind sich widersprechende Features, wobei die Planung Vorrang vor der Aggregation hat.
Schnelles Beispiel
Auf der Client-Seite verwenden Sie die Queue
- und Group
-Optionen, um Tasks in derselben Gruppe in die Warteschlange zu stellen.
// Fügen Sie drei Tasks derselben Gruppe hinzu.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
Auf der Serverseite bieten Sie einen GroupAggregator
an, um die Task-Aggregation zu ermöglichen. Sie können die Aggregationsstrategie anpassen, indem Sie GroupGracePeriod
, GroupMaxDelay
und GroupMaxSize
konfigurieren.
// Diese Funktion wird verwendet, um mehrere Tasks in einen Task zu aggregieren.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Ihre Logik, um die gegebenen Tasks zu aggregieren und den aggregierten Task zurückzugeben.
// ... Verwenden Sie bei Bedarf NewTask(typename, payload, opts...), um einen neuen Task zu erstellen und Optionen festzulegen.
// ... (Hinweis) Die Queue-Option wird ignoriert, und der aggregierte Task wird immer in dieselbe Warteschlange wie die Gruppe eingereiht.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Tutorial
In diesem Abschnitt stellen wir ein einfaches Programm zur Verfügung, um die Verwendung der Aggregationsfunktion zu demonstrieren.
Erstellen Sie zunächst ein Client-Programm mit dem folgenden Code:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adresse des Redis-Servers")
flagMessage = flag.String("message", "hallo", "Nachricht, die gedruckt werden soll, wenn der Task bearbeitet wird")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("Fehler beim Einreihen des Tasks: %v", err)
}
log.Printf("Task erfolgreich in die Warteschlange eingereiht: %s", info.ID)
}
Sie können dieses Programm mehrmals ausführen:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
Wenn Sie jetzt die Warteschlange über die Befehlszeile oder die Web-Benutzeroberfläche überprüfen, werden Sie feststellen, dass Tasks in der Warteschlange aggregiert werden.
Erstellen Sie als nächstes ein Serverprogramm mit folgendem Code:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adresse des Redis-Servers")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Gnadenfrist für Gruppen")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Maximale Verzögerung für Gruppen")
flagGroupMaxSize = flag.Int("max-size", 20, "Maximale Größe für Gruppen")
)
// Einfache Aggregationsfunktion.
// Kombiniert Nachrichten aller Aufgaben zu einer einzigen, wobei jede Nachricht eine Zeile einnimmt.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Aggregierte %d Aufgaben aus der Gruppe %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregierte-aufgabe", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("Handler hat die aggregierte Aufgabe erhalten")
log.Printf("Aggregierte Nachricht: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("aggregierte-aufgabe", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Fehler beim Start des Servers: %v", err)
}
}
Sie können dieses Programm ausführen und die Ausgabe beobachten:
$ go build -o server server.go
$ ./server --redis-addr=
Sie sollten in der Ausgabe sehen können, dass der Server Aufgaben in der Gruppe aggregiert hat und der Prozessor die aggregierten Aufgaben verarbeitet hat. Probieren Sie gerne aus, die Flags --grace-period
, --max-delay
und --max-size
im obigen Programm zu ändern, um zu sehen, wie sie die Aggregationsstrategie beeinflussen.