Przegląd

Agregacja zadań pozwala na kolejkowanie wielu zadań kolejno, zamiast przekazywania ich pojedynczo do "Handlera". Ta funkcja umożliwia grupowanie wielu kolejnych operacji w jedno, oszczędzając koszty, optymalizując pamięć podręczną lub grupując powiadomienia.

Zasada działania

Aby skorzystać z funkcji agregacji zadań, musisz umieścić zadania z tą samą nazwą grupy w tej samej kolejce. Zadania umieszczone w kolejce za pomocą tego samego pary (kolejka, grupa) zostaną zgrupowane w jedno zadanie za pomocą dostarczonego GroupAggregatora, a zgrupowane zadanie zostanie przekazane do handlera.

Podczas tworzenia zgrupowanego zadania serwer Asynq będzie oczekiwał na kolejne zadania aż do upływu konfigurowalnego okresu łaski. Za każdym razem, gdy nowe zadanie zostaje umieszczone za pomocą tego samego (kolejka, grupa), okres łaski jest aktualizowany.

Okres łaski ma konfigurowalny górny limit: możesz ustawić maksymalny czas opóźnienia agregacji, po upływie którego serwer Asynq zignoruje pozostały okres łaski i zgrupuje zadania.

Można również ustawić maksymalną liczbę zadań, które mogą być zgrupowane razem. Gdy ta liczba zostanie osiągnięta, serwer Asynq natychmiast zgrupuje zadania.

Uwaga: Harmonogramowanie zadań i ich agregacja są ze sobą sprzeczne, przy czym harmonogramowanie ma pierwszeństwo przed agregacją.

Szybki przykład

Po stronie klienta użyj opcji Queue i Group, aby dodawać zadania do tej samej grupy.

// Dodaj trzy zadania do tej samej grupy.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Po stronie serwera dostarcz GroupAggregator, aby umożliwić agregację zadań. Możesz dostosować strategię agregacji, konfigurując GroupGracePeriod, GroupMaxDelay i GroupMaxSize.

// Ta funkcja służy do agregowania wielu zadań w jedno zadanie.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Twoja logika agregacji zadanych zadań i zwróć zadanie zagregowane.
    // ... W razie potrzeby użyj NewTask(typename, payload, opts...) do utworzenia nowego zadania i ustawienia opcji.
    // ... (Uwaga) opcja Queue zostanie zignorowana, a zadanie zagregowane zawsze zostanie dodane do tej samej kolejki co grupa.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Samouczek

W tej sekcji dostarczamy prosty program, aby zademonstrować użycie funkcji agregacji.

Najpierw stwórz program klienta zawierający poniższy kod:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Adres serwera Redis")
       flagMessage = flag.String("message", "hello", "Wiadomość do wydrukowania podczas przetwarzania zadania")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Nie udało się dodać zadania do kolejki: %v", err)
        }
        log.Printf("Pomyślnie dodano zadanie do kolejki: %s", info.ID)
}

Możesz uruchamiać ten program wielokrotnie:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Teraz, jeśli sprawdzisz kolejkę za pomocą interfejsu wiersza poleceń lub interfejsu użytkownika sieciowego, zobaczysz, że zadania są agregowane w kolejce.

Następnie stwórz program serwera zawierający poniższy kod:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Adres serwera Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Okres karencji grup")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Maksymalne opóźnienie dla grup")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Maksymalny rozmiar grup")
)

// Prosta funkcja agregacji.
// Łączy wiadomości wszystkich zadań w jedną, gdzie każda wiadomość zajmuje jedną linię.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Zagregowano %d zadań z grupy %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("zadanie-zagregowane", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Program obsługił zadanie zagregowane")
	log.Printf("Zagregowana wiadomość: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("zadanie-zagregowane", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Nie udało się uruchomić serwera: %v", err)
	}
}

Możesz uruchomić ten program i obserwować wynik:

$ go build -o server server.go
$ ./server --redis-addr=

Wynikiem powinno być zobaczenie w konsoli, że serwer zagregował zadania w grupie, a procesor przetworzył zadania zagregowane. Zachęcam do zmiany flag --grace-period, --max-delay i --max-size w powyższym programie, aby zobaczyć, jak wpływają one na strategię agregacji.