このページでは、Asynqのタスク集約機能について紹介します。
概要
タスク集約機能は、1つずつ「ハンドラ」に渡すのではなく、複数のタスクを連続してキューに入れることができます。この機能により、複数の連続した操作を1つにまとめることで、コストを節約したり、キャッシュを最適化したり、通知を一括して処理したりすることができます。
動作原理
タスク集約機能を使用するには、同じグループ名を持つタスクを同じキューに入れる必要があります。同じ(queue, group)
のペアを使用してキューに入れたタスクは、提供された**GroupAggregator
**によって1つのタスクに集約され、集約されたタスクはハンドラに渡されます。
集約されたタスクを作成する際、Asynqサーバーは設定可能な猶予期間が切れるまで、さらなるタスクを待機します。新しいタスクが同じ(queue, group)
でキューに入れられるたびに、猶予期間が更新されます。
猶予期間には設定可能な上限があります。最大集約遅延時間を設定することができ、その時間が経過すると、Asynqサーバーは残りの猶予期間を無視してタスクを集約します。
また、一緒に集約できる最大タスク数も設定できます。この数に達すると、Asynqサーバーは即座にタスクを集約します。
注意: タスクのスケジューリングと集約は競合する機能であり、スケジューリングが集約より優先されます。
クイックな例
クライアント側では、同じグループにタスクをキューイングするために Queue
と Group
オプションを使用します。
// 同じグループに3つのタスクをエンキューします。
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
サーバー側では、タスクの集約を可能にする GroupAggregator
を提供します。GroupGracePeriod
、GroupMaxDelay
、GroupMaxSize
を構成して集約戦略をカスタマイズできます。
// この関数は複数のタスクを1つのタスクに集約するために使用されます。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... 与えられたタスクを集約し、集約されたタスクを返すためのロジック
// ... 必要に応じて、NewTask(typename, payload, opts...) を使用して新しいタスクを作成し、オプションを設定します。
// ...(注意)Queue オプションは無視され、集約されたタスクは常にグループと同じキューにキューイングされます。
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
チュートリアル
このセクションでは、集約機能の使用をデモンストレーションするための簡単なプログラムを提供します。
まず、以下のコードでクライアントプログラムを作成します。
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis サーバーアドレス")
flagMessage = flag.String("message", "hello", "タスクを処理する際に出力されるメッセージ")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("タスクのエンキューに失敗しました: %v", err)
}
log.Printf("タスクを正常にエンキューしました: %s", info.ID)
}
このプログラムを複数回実行できます。
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
これで、CLI または Web UI を介してキューを確認すると、キュー内で集約されたタスクが表示されます。
次に、以下のコードでサーバープログラムを作成します。
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redisサーバーのアドレス")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "グループの猶予期間")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "グループの最大遅延時間")
flagGroupMaxSize = flag.Int("max-size", 20, "グループの最大サイズ")
)
// 単純な集計関数。
// すべてのタスクのメッセージを1つにまとめ、各メッセージを1行ずつ取り込む。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("%d 個のタスクを %q グループから集計しました", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("ハンドラが集計されたタスクを受け取りました")
log.Printf("集計されたメッセージ: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("aggregated-task", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("サーバーの起動に失敗しました: %v", err)
}
}
このプログラムを実行し、出力を観察できます。
$ go build -o server server.go
$ ./server --redis-addr=
出力で、サーバーがグループ内のタスクを集計し、プロセッサーが集計されたタスクを処理したことが確認できるはずです。上記のプログラムで--grace-period
、--max-delay
、--max-size
フラグを変更して、それらが集計戦略にどのように影響するかを試してみてください。