Questa pagina introduce la funzionalità di aggregazione dei compiti di Asynq.

Panoramica

L'aggregazione dei compiti ti consente di accodare più compiti in successione, piuttosto che passarli uno per uno a un "gestore". Questa funzionalità ti permette di raggruppare più operazioni consecutive in un'unica, risparmiando costi, ottimizzando la memorizzazione nella cache o raggruppando le notifiche.

Principio di funzionamento

Per utilizzare la funzionalità di aggregazione dei compiti, è necessario accodare i compiti con lo stesso nome del gruppo nella stessa coda. I compiti accodati utilizzando la stessa coppia (coda, gruppo) verranno aggregati in un unico compito dal GroupAggregator che fornisci e il compito aggregato verrà passato al gestore.

Quando si crea un compito aggregato, il server Asynq attenderà ulteriori compiti fino a quando non scade il periodo di tolleranza configurabile. Ogni volta che viene accodato un nuovo compito con la stessa coppia (coda, gruppo), il periodo di tolleranza viene aggiornato.

Il periodo di tolleranza ha un limite superiore configurabile: è possibile impostare il tempo massimo di ritardo di aggregazione, dopo il quale il server Asynq ignorerà il periodo di tolleranza residuo e aggregherà i compiti.

È inoltre possibile impostare il numero massimo di compiti che possono essere aggregati insieme. Se viene raggiunto questo numero, il server Asynq aggregherà immediatamente i compiti.

Nota: La pianificazione dei compiti e l'aggregazione sono funzionalità in conflitto, con la pianificazione che ha la precedenza sull'aggregazione.

Esempio veloce

Lato client, utilizzare le opzioni Queue e Group per mettere in coda le attività nello stesso gruppo.

// Mettere in coda tre attività nello stesso gruppo.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Lato server, fornire un GroupAggregator per abilitare l'aggregazione delle attività. È possibile personalizzare la strategia di aggregazione configurando GroupGracePeriod, GroupMaxDelay e GroupMaxSize.

// Questa funzione è utilizzata per aggregare più attività in un'unica attività.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... La tua logica per aggregare le attività date e restituire l'attività aggregata.
    // ... Se necessario, utilizza NewTask(typename, payload, opts...) per creare una nuova attività e impostare le opzioni.
    // ... (Nota) l'opzione Queue verrà ignorata e l'attività aggregata verrà sempre messa in coda alla stessa coda del gruppo.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Tutorial

In questa sezione, forniamo un programma semplice per dimostrare l'uso della funzionalità di aggregazione.

Innanzitutto, crea un programma client con il seguente codice:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Indirizzo del server Redis")
       flagMessage = flag.String("message", "ciao", "Messaggio da stampare durante l'elaborazione dell'attività")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregazione-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("esempio-gruppo"))
        if err != nil {
                log.Fatalf("Impossibile mettere in coda l'attività: %v", err)
        }
        log.Printf("Attività messa in coda con successo: %s", info.ID)
}

Puoi eseguire questo programma più volte:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=ciao --redis-addr=
$ ./client --message=addio --redis-addr=

Ora, se controlli la coda tramite CLI o Web UI, vedrai le attività essere aggregate nella coda.

Successivamente, crea un programma server con il seguente codice:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr        = flag.String("redis-addr", "localhost:6379", "Indirizzo del server Redis")
	flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Periodo di tolleranza per i gruppi")
	flagGroupMaxDelay    = flag.Duration("max-delay", 30*time.Second, "Ritardo massimo per i gruppi")
	flagGroupMaxSize     = flag.Int("max-size", 20, "Dimensione massima per i gruppi")
)

// Funzione di aggregazione semplice.
// Combina i messaggi di tutti i task in uno solo, con ogni messaggio che occupa una riga.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Aggregati %d task dal gruppo %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("task-aggregato", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Il gestore ha ricevuto il task aggregato")
	log.Printf("Messaggio aggregato: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("task-aggregato", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Impossibile avviare il server: %v", err)
	}
}

Puoi eseguire questo programma e osservare l'output:

$ go build -o server server.go
$ ./server --redis-addr=

Nell'output dovresti vedere che il server ha aggregato i task nel gruppo e il processore ha elaborato i task aggregati. Sentiti libero di provare a modificare i flag --grace-period, --max-delay e --max-size nel programma sopra indicato per vedere come influiscono sulla strategia di aggregazione.