Redis 클러스터를 메시지 브로커로 사용하는 방법 소개

Redis 클러스터 사용의 장점

Redis 클러스터를 사용하면 다음과 같은 이점을 얻을 수 있습니다:

  • 여러 Redis 노드에 데이터를 쉽게 분할(shard)할 수 있음
  • 특정 노드의 장애 발생 시 가용성을 유지할 수 있음
  • 자동 장애 조치(failover)를 수행할 수 있음

개요

클러스터 큐 일러스트

Asynq는 대기열(queue)을 기반으로 데이터를 분산(shard)합니다. 위 다이어그램에서는 6개의 인스턴스(3개의 마스터 및 3개의 슬레이브)와 4개의 큐(q1, q2, q3, q4)가 있는 Redis 클러스터가 있습니다.

  • 마스터1(그리고 해당 복제본, 슬레이브1)는 q1 및 q2를 호스팅합니다.
  • 마스터2(그리고 해당 복제본, 슬레이브2)는 q3를 호스팅합니다.
  • 마스터3(그리고 해당 복제본, 슬레이브3)는 q4를 호스팅합니다.

asynq.Client를 사용하여 작업을 큐에 추가할 때 Queue 옵션을 사용하여 큐를 지정할 수 있습니다. 큐에 추가된 작업은 해당 큐에서 작업을 가져오는 asynq.Server(들)에 의해 소비됩니다.

튜토리얼

이 섹션에서는 Redis Cluster를 Asynq의 메시지 브로커로 사용하는 방법을 소개합니다. 우리는 포트 7000에서 7005까지 실행 중인 6개의 Redis 인스턴스 클러스터를 가정합니다. 아래는 redis.conf 파일의 예시입니다:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

다음으로, 두 개의 이진 파일(client와 worker)을 생성할 것입니다.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.go에서는 새로운 asynq.Client를 생성하고 RedisClusterClientOpt를 통해 Redis Cluster에 어떻게 연결할 지를 지정할 것입니다.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

클라이언트가 준비되면, 세 가지 다른 큐에 태스크를 생성하고 그 안에 넣을 것입니다:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// 큐 이름
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // "notifications:email" 태스크를 생성하고 "notifications" 큐에 넣습니다.
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)

    // "webhooks:sync" 태스크를 생성하고 "webhooks" 큐에 넣습니다.
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)

    // "images:resize" 태스크를 생성하고 "images" 큐에 넣습니다.
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("성공적으로 큐에 등록되었습니다: %+v\n", res)
}

이제 우리는 이 프로그램을 실행하여 세 가지 태스크를 큐에 넣을 것입니다.

go run client/client.go

이제 worker로 이동하여 이 세 가지 태스크를 처리할 것입니다. worker.go에서 우리는 asynq.Server를 생성하여 이 세 가지 큐에서 태스크를 소비할 것입니다. 마찬가지로, RedisClusterClientOpt를 사용하여 우리의 Redis Cluster에 연결할 것입니다.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// 여기에서 각 큐에 동일한 우선 순위를 설정합니다
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("서버를 시작하지 못했습니다: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("%d에서 %d로 이메일을 보냅니다\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("웹훅 작업 처리 중: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("이미지 크기 조정 중: %s\n", src)
	return nil
}

이전에 만든 세 가지 작업을 처리하기 위해 이 worker 서버를 실행해 봅시다.

go run worker/worker.go

각 핸들러에서 출력된 메시지를 확인할 수 있어야 합니다.


개요에 언급된 대로, Asynq는 큐에 따라 데이터를 분할합니다. 동일한 큐에 추가된 모든 작업은 동일한 Redis 노드에 속합니다. 그래서 어떤 Redis 노드가 어떤 큐를 호스팅하고 있는지 확인할 수 있을까요?

이 질문에 대답하기 위해 CLI를 사용할 수 있습니다.

asynq queue ls --cluster


이 명령은 다음을 포함하여 큐 목록을 출력합니다:

- 큐가 속한 클러스터 노드
- 큐가 매핑된 클러스터 해시 슬롯

출력은 다음과 같이 보일 수 있습니다:

Queue Cluster KeySlot Cluster Nodes


images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]