หน้านี้นำเสนอวิธีการใช้ Redis Cluster เป็นตัวจัดการข้อความใน Asynq
ข้อดีของการใช้ Redis Cluster
การใช้ Redis Cluster จะทำให้คุณได้รับประโยชน์ต่อไปนี้:
- การแบ่งข้อมูลได้อย่างง่ายในหลายๆ โหนดของ Redis
- รักษาความพร้อมใช้งานในกรณีเกิดความล้มเหลวของโหนดบางตัว
- ดำเนินการ Failover โดยอัตโนมัติ
ภาพรวม
Asynq แบ่งข้อมูลลงตามคิว ในแผนภาพข้างต้น เรามี Redis Cluster ที่มี 6 อินสแตนซ์ (3 ตัวเป็น Master และ 3 ตัวเป็น Slave) และมี 4 คิว (q1, q2, q3, q4)
- Master1 (และ Slave1 ที่เป็นตัวคัมทำ) รองรับ q1 และ q2
- Master2 (และ Slave2 ที่เป็นตัวคัมทำ) รองรับ q3
- Master3 (และ Slave3 ที่เป็นตัวคัมทำ) รองรับ q4
เมื่อคุณใช้ asynq.Client
เพื่อนำงานเข้าคิว คุณสามารถระบุคิวโดยใช้ ตัวเลือก Queue
งานที่นำเข้าคิวจะถูกโดนใช้งานโดย asynq.Server
(s) ที่ดึงงานจากคิวเหล่านี้
รายการสอน
ในส่วนนี้ เราจะแนะนำวิธีการใช้ Redis Cluster เป็นตัวกำหนดข้อความสำหรับ Asynq ซึ่งเราสมมติว่าคุณมีคลัสเตอร์ของ 6 รายการ Redis ที่ทำงานอยู่บนพอร์ต 7000-7005 ด้านล่างนี้เป็นตัวอย่างของไฟล์ redis.conf
:
พอร์ต 7000
cluster-enabled ใช่
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly ใช่
ต่อมา เราจะสร้างไฟล์ซอร์สที่ลีบิลคลิกสองไฟล์: ไคลเอ็นต์และเวิร์กเกอร์
ไปโมดอินิตอฉัน asynq-redis-cluster-quickstart
mkdir ไคลเอ็นต์ เวิร์กเกอร์
สถาตการ์ไคลเอ็นต์/ไคลเอ็นต์โก
สถาตการ์เวิร์กเกอร์/เวิร์กเกอร์โก
ใน client.go
เราจะสร้าง asynq.Client
ใหม่และระบุว่าจะเชื่อมต่อกับ Redis Cluster โดยการส่ง RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
เมื่อเรามีไคลเอ็นต์แล้ว เราจะสร้างงานและคิวที่ต่างกันเข้าคิวทั้งสาม:
- การแจ้งเตือน
- เว็บฮุก
- รูปภาพ
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// ชื่อคิว
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// สร้างงาน "notifications:email" และส่งเข้าคิว "notifications"
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)
// สร้างงาน "webhooks:sync" และส่งเข้าคิว "webhooks"
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)
// สร้างงาน "images:resize" และส่งเข้าคิว "images"
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "ทางบาง/ไปยัง/รูปภาพ"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)
}
ให้เรามารันโปรแกรมนี้เพื่อเข้าคิวงานสามชิ้นเข้าไปในคิว
ไปรันไคลเอ็นต์/ไคลเอ็นต์โก
ตอนนี้ เรามาดูที่เวิร์กเกอร์เพื่อประมวลผลงานสามชิ้นเหล่านี้ ใน worker.go
เราจะสร้าง asynq.Server
เพื่อบริโภคงานจากคิวทั้งสามเหล่านี้ ในทำนองเดียวกัน เราจะเชื่อมต่อกับ Redis Cluster ของเราโดยใช้ RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// ตั้งค่าความสำคัญเท่ากันสำหรับแต่ละคิวที่นี่
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("เริ่มเซิร์ฟเวอร์ไม่สำเร็จ: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("ส่งอีเมลจาก %d ถึง %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("การจัดการงาน Webhook: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("ปรับขนาดรูปภาพ: %s\n", src)
return nil
}
ลองรันเซิร์ฟเวอร์เวิร์คเกอร์เพื่อจัดการกับงานสามประเภทที่เราสร้างไว้ก่อนหน้านี้.
go run worker/worker.go
คุณควรจะสามารถเห็นข้อความที่พิมพ์ออกมาจากแต่ละแฮนด์เลอร์
ตามที่กล่าวไว้ในภาพรวม, Asynq แบ่งข้อมูลตามคิว งานทั้งหมดที่เพิ่มเข้าคิวเดียวกันจะอยู่ในโหนด Redis เดียวกัน จึง Redis โหนดใดจะเก็บคิวใด?
เราสามารถใช้ CLI ในการตอบคำถามนี้
asynq queue ls --cluster
คำสั่งนี้จะพิมพ์รายการคิวพร้อมกับ:
- โหนดที่ใช้ในกลุ่ม Cluste
- ช่องคาดที่อยู่ในกลุ่ม Cluster
ผลลัพธ์อาจจะมีลักษณะใดลักษณหนึ่งตามนี้:
Queue Cluster KeySlot Cluster Nodes
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]