このページでは、Asynqのタスク集約機能について紹介します。

概要

タスク集約機能は、1つずつ「ハンドラ」に渡すのではなく、複数のタスクを連続してキューに入れることができます。この機能により、複数の連続した操作を1つにまとめることで、コストを節約したり、キャッシュを最適化したり、通知を一括して処理したりすることができます。

動作原理

タスク集約機能を使用するには、同じグループ名を持つタスクを同じキューに入れる必要があります。同じ(queue, group)のペアを使用してキューに入れたタスクは、提供された**GroupAggregator**によって1つのタスクに集約され、集約されたタスクはハンドラに渡されます。

集約されたタスクを作成する際、Asynqサーバーは設定可能な猶予期間が切れるまで、さらなるタスクを待機します。新しいタスクが同じ(queue, group)でキューに入れられるたびに、猶予期間が更新されます。

猶予期間には設定可能な上限があります。最大集約遅延時間を設定することができ、その時間が経過すると、Asynqサーバーは残りの猶予期間を無視してタスクを集約します。

また、一緒に集約できる最大タスク数も設定できます。この数に達すると、Asynqサーバーは即座にタスクを集約します。

注意: タスクのスケジューリングと集約は競合する機能であり、スケジューリングが集約より優先されます。

クイックな例

クライアント側では、同じグループにタスクをキューイングするために QueueGroup オプションを使用します。

// 同じグループに3つのタスクをエンキューします。
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

サーバー側では、タスクの集約を可能にする GroupAggregator を提供します。GroupGracePeriodGroupMaxDelayGroupMaxSize を構成して集約戦略をカスタマイズできます。

// この関数は複数のタスクを1つのタスクに集約するために使用されます。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... 与えられたタスクを集約し、集約されたタスクを返すためのロジック
    // ... 必要に応じて、NewTask(typename, payload, opts...) を使用して新しいタスクを作成し、オプションを設定します。
    // ...(注意)Queue オプションは無視され、集約されたタスクは常にグループと同じキューにキューイングされます。
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

チュートリアル

このセクションでは、集約機能の使用をデモンストレーションするための簡単なプログラムを提供します。

まず、以下のコードでクライアントプログラムを作成します。

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis サーバーアドレス")
       flagMessage = flag.String("message", "hello", "タスクを処理する際に出力されるメッセージ")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("タスクのエンキューに失敗しました: %v", err)
        }
        log.Printf("タスクを正常にエンキューしました: %s", info.ID)
}

このプログラムを複数回実行できます。

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

これで、CLI または Web UI を介してキューを確認すると、キュー内で集約されたタスクが表示されます。

次に、以下のコードでサーバープログラムを作成します。

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Redisサーバーのアドレス")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "グループの猶予期間")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "グループの最大遅延時間")
	flagGroupMaxSize      = flag.Int("max-size", 20, "グループの最大サイズ")
)

// 単純な集計関数。
// すべてのタスクのメッセージを1つにまとめ、各メッセージを1行ずつ取り込む。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("%d 個のタスクを %q グループから集計しました", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("ハンドラが集計されたタスクを受け取りました")
	log.Printf("集計されたメッセージ: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("サーバーの起動に失敗しました: %v", err)
	}
}

このプログラムを実行し、出力を観察できます。

$ go build -o server server.go
$ ./server --redis-addr=

出力で、サーバーがグループ内のタスクを集計し、プロセッサーが集計されたタスクを処理したことが確認できるはずです。上記のプログラムで--grace-period--max-delay--max-sizeフラグを変更して、それらが集計戦略にどのように影響するかを試してみてください。