Cette page présente la fonctionnalité d'agrégation de tâches d'Asynq.
Aperçu
L'agrégation de tâches vous permet de mettre en file d'attente plusieurs tâches successives, au lieu de les passer une par une à un "Gestionnaire". Cette fonctionnalité vous permet de regrouper plusieurs opérations consécutives en une seule, ce qui permet d'économiser des coûts, d'optimiser le caching ou de regrouper les notifications.
Principe de fonctionnement
Pour utiliser la fonctionnalité d'agrégation de tâches, vous devez mettre en file d'attente des tâches avec le même nom de groupe dans la même file d'attente. Les tâches mises en file d'attente en utilisant la même paire (file d'attente, groupe)
seront agrégées en une seule tâche par le GroupAggregator
que vous fournissez, et la tâche agrégée sera transmise au gestionnaire.
Lors de la création d'une tâche agrégée, le serveur Asynq attendra d'autres tâches jusqu'à ce que la période de grâce configurable expire. Chaque fois qu'une nouvelle tâche est mise en file d'attente avec la même paire (file d'attente, groupe)
, la période de grâce est mise à jour.
La période de grâce a une limite supérieure configurable : vous pouvez définir le temps maximal d'agrégation, après quoi le serveur Asynq ignorera le reste de la période de grâce et agrégera les tâches.
Vous pouvez également définir le nombre maximal de tâches pouvant être agrégées ensemble. Si ce nombre est atteint, le serveur Asynq agrégera immédiatement les tâches.
Remarque : La planification et l'agrégation de tâches sont des fonctionnalités conflictuelles, la planification ayant la priorité sur l'agrégation.
Exemple rapide
Côté client, utilisez les options Queue
et Group
pour mettre en file d'attente des tâches dans le même groupe.
// Mettre en file d'attente trois tâches dans le même groupe.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
Côté serveur, fournissez un GroupAggregator
pour activer l'agrégation des tâches. Vous pouvez personnaliser la stratégie d'agrégation en configurant GroupGracePeriod
, GroupMaxDelay
et GroupMaxSize
.
// Cette fonction est utilisée pour agréger plusieurs tâches en une seule.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Votre logique pour agréger les tâches données et renvoyer la tâche agrégée.
// ... Si nécessaire, utilisez NewTask(typename, payload, opts...) pour créer une nouvelle tâche et définir des options.
// ... (Note) l'option Queue sera ignorée, et la tâche agrégée sera toujours mise en file dans la même file d'attente que le groupe.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Tutoriel
Dans cette section, nous fournissons un programme simple pour démontrer l'utilisation de la fonction d'agrégation.
Tout d'abord, créez un programme client avec le code suivant :
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adresse du serveur Redis")
flagMessage = flag.String("message", "hello", "Message à imprimer lors du traitement de la tâche")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("Échec de la mise en file d'attente de la tâche : %v", err)
}
log.Printf("Tâche mise en file d'attente avec succès : %s", info.ID)
}
Vous pouvez exécuter ce programme plusieurs fois :
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
Maintenant, si vous vérifiez la file d'attente via l'interface CLI ou Web, vous verrez des tâches être agrégées dans la file d'attente.
Ensuite, créez un programme serveur avec le code suivant :
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adresse du serveur Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Période de grâce pour les groupes")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Délai maximal pour les groupes")
flagGroupMaxSize = flag.Int("max-size", 20, "Taille maximale pour les groupes")
)
// Fonction d'agrégation simple.
// Combine les messages de toutes les tâches en une seule, chaque message occupant une ligne.
func agréger(groupe string, tâches []*asynq.Task) *asynq.Task {
log.Printf("Tâches agrégées %d du groupe %q", len(tâches), groupe)
var b strings.Builder
for _, t := range tâches {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("tâche-agrégée", []byte(b.String()))
}
func gérerTâcheAgrégée(ctx context.Context, tâche *asynq.Task) error {
log.Print("Le gestionnaire a reçu la tâche agrégée")
log.Printf("Message agrégé: %s", tâche.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(agréger),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("tâche-agrégée", gérerTâcheAgrégée)
if err := srv.Run(mux); err != nil {
log.Fatalf("Impossible de démarrer le serveur: %v", err)
}
}
Vous pouvez exécuter ce programme et observer la sortie :
$ go build -o server server.go
$ ./server --redis-addr=
Vous devriez pouvoir voir dans la sortie que le serveur a agrégé des tâches dans le groupe et que le processeur a traité les tâches agrégées. N'hésitez pas à essayer de modifier les drapeaux --grace-period
, --max-delay
et --max-size
dans le programme ci-dessus pour voir comment ils affectent la stratégie d'agrégation.