このページでは、AsynqでRedis Clusterをメッセージブローカーとして使用する方法について紹介します。
Redis Clusterの利点
Redis Clusterを使用することで、以下の利点が得られます:
- 複数のRedisノード間でデータを簡単にシャーディングできます
- 特定のノードの故障が発生した場合でも可用性を維持できます
- 自動的にフェイルオーバーを実行します
概要
Asynqはキューに基づいてデータをシャーディングします。上記の図では、6つのインスタンス(3つのマスターと3つのスレーブ)および4つのキュー(q1、q2、q3、q4)を持つRedis Clusterがあります。
- マスター1(およびそのレプリカであるスレーブ1)はq1とq2をホストします。
- マスター2(およびそのレプリカであるスレーブ2)はq3をホストします。
- マスター3(およびそのレプリカであるスレーブ3)はq4をホストします。
asynq.Client
を使用してタスクをキューにエンキューする際、Queue
オプションを使用してキューを指定できます。エンキューされたタスクは、これらのキューからタスクを取得するasynq.Server
(s)によって消費されます。
チュートリアル
このセクションでは、AsynqのメッセージブローカーとしてRedis Clusterを使用する方法を紹介します。6つのRedisインスタンスが7000から7005のポートで実行されているクラスターがあると仮定します。以下はredis.conf
ファイルの例です:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
次に、クライアントとワーカーの2つのバイナリファイルを作成します。
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
client.go
では、新しいasynq.Client
を作成し、RedisClusterClientOpt
を渡すことでRedis Clusterに接続する方法を指定します。
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
クライアントができたら、タスクを作成し、それらを3つの異なるキューにエンキューします:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// キュー名
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// "notifications:email"タスクを作成し、"notifications"キューにエンキューします。
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("正常にエンキューされました: %+v\n", res)
// "webhooks:sync"タスクを作成し、"webhooks"キューにエンキューします。
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("正常にエンキューされました: %+v\n", res)
// "images:resize"タスクを作成し、"images"キューにエンキューします。
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("正常にエンキューされました: %+v\n", res)
}
このプログラムを実行して、3つのタスクをキューにエンキューしましょう。
go run client/client.go
さて、workerに移動してこれら3つのタスクを処理しましょう。worker.go
では、asynq.Server
を作成して、これら3つのキューからタスクを消費します。同様に、RedisClusterClientOpt
を使用してRedis Clusterに接続します。
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// 各キューに同じ優先順位を設定します
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("サーバの起動に失敗しました:%v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("%dから%dへのメールの送信\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Webhookタスクの処理:%d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("画像のリサイズ:%s\n", src)
return nil
}
このワーカーサーバーを実行して、先ほど作成した3つのタスクを処理します。
go run worker/worker.go
各ハンドラからプリントされたメッセージが表示されるはずです。
前述の概要に記載されているように、Asynqはキューに基づいてデータをシャードします。同じキューに追加されたすべてのタスクは同じRedisノードに属します。では、どのRedisノードがどのキューをホストしていますか?
この質問に答えるためにCLIを使用することができます。
asynq queue ls --cluster
このコマンドは、キューのリストを出力し、以下を含みます:
- キューが属するクラスターノード
- キューがマップされるクラスターハッシュスロット
出力は次のようになるかもしれません:
Queue Cluster KeySlot Cluster Nodes
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]