Обзор

Функция агрегирования задач позволяет добавлять несколько задач в очередь последовательно, а не передавать их по одной "обработчику". Эта функция позволяет объединять несколько последовательных операций в одну, экономя затраты, оптимизируя кэширование или пакетные уведомления.

Принцип работы

Для использования функции агрегирования задач необходимо добавлять задачи с одинаковым именем группы в одну и ту же очередь. Задачи, добавленные с использованием одной и той же пары (очередь, группа), будут сагрегированы в одну задачу при помощи предоставленного вами GroupAggregator, после чего агрегированная задача будет передана обработчику.

При создании агрегированной задачи сервер Asynq будет ждать появления новых задач до тех пор, пока не истечет настраиваемый период благоприятства. Каждый раз, когда новая задача добавляется с использованием той же пары (очередь, группа), период благоприятства обновляется.

Период благоприятства имеет настраиваемый верхний предел: вы можете задать максимальное время задержки агрегирования, после чего сервер Asynq игнорирует оставшийся период благоприятства и агрегирует задачи.

Также можно установить максимальное количество задач, которые могут быть агрегированы вместе. По достижении этого числа сервер Asynq немедленно проводит агрегацию задач.

Примечание: Планирование задач и их агрегация - конфликтующие функции, при этом планирование имеет приоритет перед агрегацией.

Быстрый пример

На стороне клиента используйте параметры Queue и Group, чтобы поместить задачи в одну группу.

// Добавляем три задачи в одну группу.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

На стороне сервера предоставьте GroupAggregator для обеспечения агрегации задач. Вы можете настраивать стратегию агрегации, устанавливая параметры GroupGracePeriod, GroupMaxDelay и GroupMaxSize.

// Эта функция используется для агрегации нескольких задач в одну.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Ваша логика агрегации задач и возврат агрегированной задачи.
    // ... При необходимости используйте NewTask(typename, payload, opts...) для создания новой задачи и установки параметров.
    // ... (Примечание) параметр Queue будет проигнорирован, а агрегированная задача всегда будет помещена в ту же очередь, что и группа.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Учебное пособие

В этом разделе мы предоставляем простую программу для демонстрации использования функции агрегации.

Сначала создайте клиентскую программу с следующим кодом:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Адрес сервера Redis")
       flagMessage = flag.String("message", "hello", "Сообщение, которое будет напечатано при выполнении задачи")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Не удалось поместить задачу в очередь: %v", err)
        }
        log.Printf("Задача успешно помещена в очередь: %s", info.ID)
}

Вы можете запустить эту программу несколько раз:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Теперь, если проверить очередь через интерфейс командной строки (CLI) или веб-интерфейс, вы увидите, что задачи агрегируются в очереди.

Затем создайте серверную программу с следующим кодом:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Адрес сервера Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Период 'грань-срока' для групп")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Максимальная задержка для групп")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Максимальный размер для групп")
)

// Простая функция агрегации.
// Комбинирует сообщения всех задач в одно, при этом каждое сообщение занимает одну строку.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Агрегировано %d задач из группы %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Обработчик получил агрегированную задачу")
	log.Printf("Агрегированное сообщение: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Не удалось запустить сервер: %v", err)
	}
}

Вы можете запустить эту программу и наблюдать вывод:

$ go build -o server server.go
$ ./server --redis-addr=   

Вы должны видеть в выводе, что сервер агрегировал задачи в группе и процессор обработал агрегированные задачи. Не стесняйтесь изменять флаги --grace-period, --max-delay и --max-size в приведенной выше программе, чтобы увидеть, как они влияют на стратегию агрегации.