Redis 클러스터를 메시지 브로커로 사용하는 방법 소개
Redis 클러스터 사용의 장점
Redis 클러스터를 사용하면 다음과 같은 이점을 얻을 수 있습니다:
- 여러 Redis 노드에 데이터를 쉽게 분할(shard)할 수 있음
- 특정 노드의 장애 발생 시 가용성을 유지할 수 있음
- 자동 장애 조치(failover)를 수행할 수 있음
개요
Asynq는 대기열(queue)을 기반으로 데이터를 분산(shard)합니다. 위 다이어그램에서는 6개의 인스턴스(3개의 마스터 및 3개의 슬레이브)와 4개의 큐(q1, q2, q3, q4)가 있는 Redis 클러스터가 있습니다.
- 마스터1(그리고 해당 복제본, 슬레이브1)는 q1 및 q2를 호스팅합니다.
- 마스터2(그리고 해당 복제본, 슬레이브2)는 q3를 호스팅합니다.
- 마스터3(그리고 해당 복제본, 슬레이브3)는 q4를 호스팅합니다.
asynq.Client
를 사용하여 작업을 큐에 추가할 때 Queue
옵션을 사용하여 큐를 지정할 수 있습니다. 큐에 추가된 작업은 해당 큐에서 작업을 가져오는 asynq.Server
(들)에 의해 소비됩니다.
튜토리얼
이 섹션에서는 Redis Cluster를 Asynq의 메시지 브로커로 사용하는 방법을 소개합니다. 우리는 포트 7000에서 7005까지 실행 중인 6개의 Redis 인스턴스 클러스터를 가정합니다. 아래는 redis.conf
파일의 예시입니다:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
다음으로, 두 개의 이진 파일(client와 worker)을 생성할 것입니다.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
client.go
에서는 새로운 asynq.Client
를 생성하고 RedisClusterClientOpt
를 통해 Redis Cluster에 어떻게 연결할 지를 지정할 것입니다.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
클라이언트가 준비되면, 세 가지 다른 큐에 태스크를 생성하고 그 안에 넣을 것입니다:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// 큐 이름
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// "notifications:email" 태스크를 생성하고 "notifications" 큐에 넣습니다.
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)
// "webhooks:sync" 태스크를 생성하고 "webhooks" 큐에 넣습니다.
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)
// "images:resize" 태스크를 생성하고 "images" 큐에 넣습니다.
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)
}
이제 우리는 이 프로그램을 실행하여 세 가지 태스크를 큐에 넣을 것입니다.
go run client/client.go
이제 worker로 이동하여 이 세 가지 태스크를 처리할 것입니다. worker.go
에서 우리는 asynq.Server
를 생성하여 이 세 가지 큐에서 태스크를 소비할 것입니다. 마찬가지로, RedisClusterClientOpt
를 사용하여 우리의 Redis Cluster에 연결할 것입니다.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// 여기에서 각 큐에 동일한 우선 순위를 설정합니다
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("서버를 시작하지 못했습니다: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("%d에서 %d로 이메일을 보냅니다\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("웹훅 작업 처리 중: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("이미지 크기 조정 중: %s\n", src)
return nil
}
이전에 만든 세 가지 작업을 처리하기 위해 이 worker 서버를 실행해 봅시다.
go run worker/worker.go
각 핸들러에서 출력된 메시지를 확인할 수 있어야 합니다.
개요에 언급된 대로, Asynq는 큐에 따라 데이터를 분할합니다. 동일한 큐에 추가된 모든 작업은 동일한 Redis 노드에 속합니다. 그래서 어떤 Redis 노드가 어떤 큐를 호스팅하고 있는지 확인할 수 있을까요?
이 질문에 대답하기 위해 CLI를 사용할 수 있습니다.
asynq queue ls --cluster
이 명령은 다음을 포함하여 큐 목록을 출력합니다:
- 큐가 속한 클러스터 노드
- 큐가 매핑된 클러스터 해시 슬롯
출력은 다음과 같이 보일 수 있습니다:
Queue Cluster KeySlot Cluster Nodes
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]