このページでは、AsynqでRedis Clusterをメッセージブローカーとして使用する方法について紹介します。

Redis Clusterの利点

Redis Clusterを使用することで、以下の利点が得られます:

  • 複数のRedisノード間でデータを簡単にシャーディングできます
  • 特定のノードの故障が発生した場合でも可用性を維持できます
  • 自動的にフェイルオーバーを実行します

概要

クラスターキューのイラスト

Asynqはキューに基づいてデータをシャーディングします。上記の図では、6つのインスタンス(3つのマスターと3つのスレーブ)および4つのキュー(q1、q2、q3、q4)を持つRedis Clusterがあります。

  • マスター1(およびそのレプリカであるスレーブ1)はq1とq2をホストします。
  • マスター2(およびそのレプリカであるスレーブ2)はq3をホストします。
  • マスター3(およびそのレプリカであるスレーブ3)はq4をホストします。

asynq.Clientを使用してタスクをキューにエンキューする際、Queueオプションを使用してキューを指定できます。エンキューされたタスクは、これらのキューからタスクを取得するasynq.Server(s)によって消費されます。

チュートリアル

このセクションでは、AsynqのメッセージブローカーとしてRedis Clusterを使用する方法を紹介します。6つのRedisインスタンスが7000から7005のポートで実行されているクラスターがあると仮定します。以下はredis.confファイルの例です:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

次に、クライアントとワーカーの2つのバイナリファイルを作成します。

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.goでは、新しいasynq.Clientを作成し、RedisClusterClientOptを渡すことでRedis Clusterに接続する方法を指定します。

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

クライアントができたら、タスクを作成し、それらを3つの異なるキューにエンキューします:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// キュー名
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // "notifications:email"タスクを作成し、"notifications"キューにエンキューします。
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("正常にエンキューされました: %+v\n", res)

    // "webhooks:sync"タスクを作成し、"webhooks"キューにエンキューします。
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("正常にエンキューされました: %+v\n", res)

    // "images:resize"タスクを作成し、"images"キューにエンキューします。
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("正常にエンキューされました: %+v\n", res)
}

このプログラムを実行して、3つのタスクをキューにエンキューしましょう。

go run client/client.go

さて、workerに移動してこれら3つのタスクを処理しましょう。worker.goでは、asynq.Serverを作成して、これら3つのキューからタスクを消費します。同様に、RedisClusterClientOptを使用してRedis Clusterに接続します。

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// 各キューに同じ優先順位を設定します
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("サーバの起動に失敗しました:%v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("%dから%dへのメールの送信\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Webhookタスクの処理:%d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("画像のリサイズ:%s\n", src)
	return nil
}

このワーカーサーバーを実行して、先ほど作成した3つのタスクを処理します。

go run worker/worker.go

各ハンドラからプリントされたメッセージが表示されるはずです。

前述の概要に記載されているように、Asynqはキューに基づいてデータをシャードします。同じキューに追加されたすべてのタスクは同じRedisノードに属します。では、どのRedisノードがどのキューをホストしていますか?

この質問に答えるためにCLIを使用することができます。

asynq queue ls --cluster

このコマンドは、キューのリストを出力し、以下を含みます:

  • キューが属するクラスターノード
  • キューがマップされるクラスターハッシュスロット

出力は次のようになるかもしれません:

Queue          Cluster KeySlot  Cluster Nodes
-----          ---------------  -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]