Trang này giới thiệu cách sử dụng Redis Cluster làm trình xử lý tin nhắn trong Asynq.

Ưu điểm của việc sử dụng Redis Cluster

Thông qua việc sử dụng Redis Cluster, bạn có thể đạt được những ưu điểm sau:

  • Dễ dàng chia dữ liệu qua nhiều nút Redis
  • Bảo đảm khả năng sẵn có trong trường hợp một số nút bị lỗi
  • Tự động thực hiện failover

Tổng quan

Hình minh họa hàng đợi Cluster

Asynq chia dữ liệu dựa trên các hàng đợi. Trong sơ đồ trên, chúng tôi có một Redis Cluster với 6 phiên bản (3 bản chính và 3 bản sao) và 4 hàng đợi (q1, q2, q3, q4).

  • Master1 (và bản sao của nó, Slave1) chứa q1 và q2.
  • Master2 (và bản sao của nó, Slave2) chứa q3.
  • Master3 (và bản sao của nó, Slave3) chứa q4.

Khi sử dụng asynq.Client để đưa các nhiệm vụ vào hàng đợi, bạn có thể chỉ định hàng đợi bằng cách sử dụng tùy chọn Queue. Các nhiệm vụ được đưa vào hàng đợi sẽ được tiêu thụ bởi asynq.Server(s) kéo nhiệm vụ từ những hàng đợi này.

Hướng dẫn

Trong phần này, chúng ta sẽ giới thiệu cách sử dụng Redis Cluster như là môi trường trung chuyển tin nhắn cho Asynq. Chúng ta giả sử rằng bạn đã có một cụm 6 instance Redis đang chạy trên các cổng 7000-7005. Dưới đây là một ví dụ về tệp redis.conf:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000 
appendonly yes

Tiếp theo, chúng ta sẽ tạo hai tệp nhị phân: client và worker.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

Trong client.go, chúng ta sẽ tạo một asynq.Client mới và chỉ định cách kết nối đến Redis Cluster bằng cách truyền RedisClusterClientOpt.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Khi đã có client, chúng ta sẽ tạo các nhiệm vụ và đưa chúng vào ba hàng đợi khác nhau:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Tên hàng đợi
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // Tạo một nhiệm vụ "notifications:email" và đưa vào hàng đợi "notifications".
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Đưa vào hàng đợi thành công: %+v\n", res)

    // Tạo một nhiệm vụ "webhooks:sync" và đưa vào hàng đợi "webhooks".
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Đưa vào hàng đợi thành công: %+v\n", res)

    // Tạo một nhiệm vụ "images:resize" và đưa vào hàng đợi "images".
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "đường/dẫn/tới/hình/ảnh"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Đưa vào hàng đợi thành công: %+v\n", res)
}

Bây giờ, chúng ta hãy chạy chương trình này để đưa ba nhiệm vụ vào các hàng đợi.

go run client/client.go

Bây giờ, hãy chuyển đến worker để xử lý ba nhiệm vụ này. Trong worker.go, chúng ta sẽ tạo một asynq.Server để tiêu thụ nhiệm vụ từ ba hàng đợi này. Tương tự, chúng ta sẽ kết nối đến Redis Cluster của chúng ta bằng cách sử dụng RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Đặt cùng mức độ ưu tiên cho mỗi hàng đợi ở đây
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Không thể khởi động máy chủ: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("Đang gửi email từ %d đến %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Xử lý công việc Webhook: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Đang thay đổi kích thước ảnh: %s\n", src)
	return nil
}

Hãy chạy máy chủ worker này để xử lý ba công việc chúng ta đã tạo trước đó.

go run worker/worker.go

Bạn nên thấy các tin nhắn được in từ mỗi trình xử lý.

Như đã đề cập trong phần tổng quan, Asynq phân mảnh dữ liệu dựa trên hàng đợi. Tất cả các công việc được thêm vào cùng một hàng đợi thuộc cùng một nút Redis. Vậy, nút Redis nào chứa hàng đợi nào?

Chúng ta có thể sử dụng CLI để trả lời câu hỏi này.

asynq queue ls --cluster

Lệnh này sẽ in danh sách các hàng đợi, cùng với:

  • Nút cụm mà hàng đợi thuộc về
  • Khe băm cụm mà hàng đợi được ánh xạ

Kết quả có thể trông như sau:

Queue          Cluster KeySlot  Cluster Nodes
-----          ---------------  -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]