หน้านี้นำเสนอวิธีการใช้ Redis Cluster เป็นตัวจัดการข้อความใน Asynq

ข้อดีของการใช้ Redis Cluster

การใช้ Redis Cluster จะทำให้คุณได้รับประโยชน์ต่อไปนี้:

  • การแบ่งข้อมูลได้อย่างง่ายในหลายๆ โหนดของ Redis
  • รักษาความพร้อมใช้งานในกรณีเกิดความล้มเหลวของโหนดบางตัว
  • ดำเนินการ Failover โดยอัตโนมัติ

ภาพรวม

Cluster Queue Illustration

Asynq แบ่งข้อมูลลงตามคิว ในแผนภาพข้างต้น เรามี Redis Cluster ที่มี 6 อินสแตนซ์ (3 ตัวเป็น Master และ 3 ตัวเป็น Slave) และมี 4 คิว (q1, q2, q3, q4)

  • Master1 (และ Slave1 ที่เป็นตัวคัมทำ) รองรับ q1 และ q2
  • Master2 (และ Slave2 ที่เป็นตัวคัมทำ) รองรับ q3
  • Master3 (และ Slave3 ที่เป็นตัวคัมทำ) รองรับ q4

เมื่อคุณใช้ asynq.Client เพื่อนำงานเข้าคิว คุณสามารถระบุคิวโดยใช้ ตัวเลือก Queue งานที่นำเข้าคิวจะถูกโดนใช้งานโดย asynq.Server(s) ที่ดึงงานจากคิวเหล่านี้

รายการสอน

ในส่วนนี้ เราจะแนะนำวิธีการใช้ Redis Cluster เป็นตัวกำหนดข้อความสำหรับ Asynq ซึ่งเราสมมติว่าคุณมีคลัสเตอร์ของ 6 รายการ Redis ที่ทำงานอยู่บนพอร์ต 7000-7005 ด้านล่างนี้เป็นตัวอย่างของไฟล์ redis.conf:

พอร์ต 7000
cluster-enabled ใช่
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly ใช่

ต่อมา เราจะสร้างไฟล์ซอร์สที่ลีบิลคลิกสองไฟล์: ไคลเอ็นต์และเวิร์กเกอร์

ไปโมดอินิตอฉัน asynq-redis-cluster-quickstart
mkdir ไคลเอ็นต์ เวิร์กเกอร์
สถาตการ์ไคลเอ็นต์/ไคลเอ็นต์โก
สถาตการ์เวิร์กเกอร์/เวิร์กเกอร์โก

ใน client.go เราจะสร้าง asynq.Client ใหม่และระบุว่าจะเชื่อมต่อกับ Redis Cluster โดยการส่ง RedisClusterClientOpt.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

เมื่อเรามีไคลเอ็นต์แล้ว เราจะสร้างงานและคิวที่ต่างกันเข้าคิวทั้งสาม:

  • การแจ้งเตือน
  • เว็บฮุก
  • รูปภาพ
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// ชื่อคิว
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // สร้างงาน "notifications:email" และส่งเข้าคิว "notifications"
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)

    // สร้างงาน "webhooks:sync" และส่งเข้าคิว "webhooks"
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)

    // สร้างงาน "images:resize" และส่งเข้าคิว "images"
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "ทางบาง/ไปยัง/รูปภาพ"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("เข้าคิวเรียบร้อย: %+v\n", res)
}

ให้เรามารันโปรแกรมนี้เพื่อเข้าคิวงานสามชิ้นเข้าไปในคิว

ไปรันไคลเอ็นต์/ไคลเอ็นต์โก

ตอนนี้ เรามาดูที่เวิร์กเกอร์เพื่อประมวลผลงานสามชิ้นเหล่านี้ ใน worker.go เราจะสร้าง asynq.Server เพื่อบริโภคงานจากคิวทั้งสามเหล่านี้ ในทำนองเดียวกัน เราจะเชื่อมต่อกับ Redis Cluster ของเราโดยใช้ RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// ตั้งค่าความสำคัญเท่ากันสำหรับแต่ละคิวที่นี่
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("เริ่มเซิร์ฟเวอร์ไม่สำเร็จ: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("ส่งอีเมลจาก %d ถึง %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("การจัดการงาน Webhook: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("ปรับขนาดรูปภาพ: %s\n", src)
	return nil
}

ลองรันเซิร์ฟเวอร์เวิร์คเกอร์เพื่อจัดการกับงานสามประเภทที่เราสร้างไว้ก่อนหน้านี้.

go run worker/worker.go

คุณควรจะสามารถเห็นข้อความที่พิมพ์ออกมาจากแต่ละแฮนด์เลอร์

ตามที่กล่าวไว้ในภาพรวม, Asynq แบ่งข้อมูลตามคิว งานทั้งหมดที่เพิ่มเข้าคิวเดียวกันจะอยู่ในโหนด Redis เดียวกัน จึง Redis โหนดใดจะเก็บคิวใด?

เราสามารถใช้ CLI ในการตอบคำถามนี้

asynq queue ls --cluster

คำสั่งนี้จะพิมพ์รายการคิวพร้อมกับ:

  • โหนดที่ใช้ในกลุ่ม Cluste
  • ช่องคาดที่อยู่ในกลุ่ม Cluster

ผลลัพธ์อาจจะมีลักษณะใดลักษณหนึ่งตามนี้:

Queue          Cluster KeySlot  Cluster Nodes
-----          ---------------  -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]