Questa pagina introduce la funzionalità di aggregazione dei compiti di Asynq.
Panoramica
L'aggregazione dei compiti ti consente di accodare più compiti in successione, piuttosto che passarli uno per uno a un "gestore". Questa funzionalità ti permette di raggruppare più operazioni consecutive in un'unica, risparmiando costi, ottimizzando la memorizzazione nella cache o raggruppando le notifiche.
Principio di funzionamento
Per utilizzare la funzionalità di aggregazione dei compiti, è necessario accodare i compiti con lo stesso nome del gruppo nella stessa coda. I compiti accodati utilizzando la stessa coppia (coda, gruppo)
verranno aggregati in un unico compito dal GroupAggregator
che fornisci e il compito aggregato verrà passato al gestore.
Quando si crea un compito aggregato, il server Asynq attenderà ulteriori compiti fino a quando non scade il periodo di tolleranza configurabile. Ogni volta che viene accodato un nuovo compito con la stessa coppia (coda, gruppo)
, il periodo di tolleranza viene aggiornato.
Il periodo di tolleranza ha un limite superiore configurabile: è possibile impostare il tempo massimo di ritardo di aggregazione, dopo il quale il server Asynq ignorerà il periodo di tolleranza residuo e aggregherà i compiti.
È inoltre possibile impostare il numero massimo di compiti che possono essere aggregati insieme. Se viene raggiunto questo numero, il server Asynq aggregherà immediatamente i compiti.
Nota: La pianificazione dei compiti e l'aggregazione sono funzionalità in conflitto, con la pianificazione che ha la precedenza sull'aggregazione.
Esempio veloce
Lato client, utilizzare le opzioni Queue
e Group
per mettere in coda le attività nello stesso gruppo.
// Mettere in coda tre attività nello stesso gruppo.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
Lato server, fornire un GroupAggregator
per abilitare l'aggregazione delle attività. È possibile personalizzare la strategia di aggregazione configurando GroupGracePeriod
, GroupMaxDelay
e GroupMaxSize
.
// Questa funzione è utilizzata per aggregare più attività in un'unica attività.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... La tua logica per aggregare le attività date e restituire l'attività aggregata.
// ... Se necessario, utilizza NewTask(typename, payload, opts...) per creare una nuova attività e impostare le opzioni.
// ... (Nota) l'opzione Queue verrà ignorata e l'attività aggregata verrà sempre messa in coda alla stessa coda del gruppo.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Tutorial
In questa sezione, forniamo un programma semplice per dimostrare l'uso della funzionalità di aggregazione.
Innanzitutto, crea un programma client con il seguente codice:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Indirizzo del server Redis")
flagMessage = flag.String("message", "ciao", "Messaggio da stampare durante l'elaborazione dell'attività")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregazione-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("esempio-gruppo"))
if err != nil {
log.Fatalf("Impossibile mettere in coda l'attività: %v", err)
}
log.Printf("Attività messa in coda con successo: %s", info.ID)
}
Puoi eseguire questo programma più volte:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=ciao --redis-addr=
$ ./client --message=addio --redis-addr=
Ora, se controlli la coda tramite CLI o Web UI, vedrai le attività essere aggregate nella coda.
Successivamente, crea un programma server con il seguente codice:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Indirizzo del server Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Periodo di tolleranza per i gruppi")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Ritardo massimo per i gruppi")
flagGroupMaxSize = flag.Int("max-size", 20, "Dimensione massima per i gruppi")
)
// Funzione di aggregazione semplice.
// Combina i messaggi di tutti i task in uno solo, con ogni messaggio che occupa una riga.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Aggregati %d task dal gruppo %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("task-aggregato", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("Il gestore ha ricevuto il task aggregato")
log.Printf("Messaggio aggregato: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("task-aggregato", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Impossibile avviare il server: %v", err)
}
}
Puoi eseguire questo programma e osservare l'output:
$ go build -o server server.go
$ ./server --redis-addr=
Nell'output dovresti vedere che il server ha aggregato i task nel gruppo e il processore ha elaborato i task aggregati. Sentiti libero di provare a modificare i flag --grace-period
, --max-delay
e --max-size
nel programma sopra indicato per vedere come influiscono sulla strategia di aggregazione.