রেডিস ক্লাস্টার ব্যবহারের সুবিধা

রেডিস ক্লাস্টার ব্যবহার করে, আপনি নিম্নলিখিত সুবিধা অর্জন করতে পারেন:

  • একাধিক রেডিস নোডে ডাটা শার্ড করা সহজ
  • নির্দিষ্ট নোড ব্যর্থ হওয়ার ঘটনায় উপস্থিতি বজায় রাখা
  • স্বয়ংক্রিয় ফেইলওভার করা

ওভারভিউ

ক্লাস্টার কিউ চিত্রাঙ্কন

Asynq তথ্য কিউ ভিত্তিক শার্ড হয়। উপরের ডায়াগ্রামে, আমরা 6 টি ইনস্ট্যান্স (3 টি মাস্টার এবং 3 টি স্লেভ) এবং 4 টি কিউ (q1, q2, q3, q4) সহ একটি রেডিস ক্লাস্টার রয়েছে।

  • Master1 (এবং তার রিপ্লিকা, Slave1) q1 এবং q2 হোস্ট করে।
  • Master2 (এবং তার রিপ্লিকা, Slave2) q3 হোস্ট করে।
  • Master3 (এবং তার রিপ্লিকা, Slave3) q4 হোস্ট করে।

আপনি যখন asynq.Client ব্যবহার করেন টাস্ক ইঞ্জিউ করতে, আপনি Queue অপশন ব্যবহার করে কিউ নির্দিষ্ট করতে পারেন। ইঞ্জিউ করা হলে টাস্কগুলি asynq.Server দ্বারা খাদ্য হবে, যে(গুলি) এই কিউ থেকে নামকরণ টাস্কগুলি তুলছেন।

টিউটোরিয়াল

এই অধ্যায়ে, আমরা দেখাব কীভাবে Redis Cluster ব্যবহার করে Asynq এর জন্য একটি মেসেজ ব্রোকার হিসেবে কাজ করে। আমরা ধরে নিচ্ছি যে আপনার কম্পিউটারে 6 টি Redis ইনস্ট্যান্স চালু আছে যা পোর্ট 7000-7005 এ চালু আছে। নিচে redis.conf ফাইলের একটি উদাহরণ দেওয়া হল:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

পরবর্তীতে, আমরা দুটি বাইনারি ফাইল তৈরি করব: ক্লায়েন্ট এবং ওয়ার্কার।

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.go ফাইলে, আমরা একটি নতুন asynq.Client তৈরি করব এবং Redis Cluster এ কীভাবে সংযোগ করব তা নির্ধারণ করব এমন RedisClusterClientOpt পাঠানো হবে।

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

ক্লায়েন্ট যখন পাব তখন আমরা তিনটি পাঠ্যবাজি তৈরি করব এবং তাদেরকে তিনটি পৃথক পৃথক কিউতে এনকিউ করব:

  • বিজ্ঞপ্তিসমূহ
  • ওয়েবহুক
  • ছবি
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// কিউ নামগুলি
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // "notifications:email" একটি টাস্ক তৈরি করব এবং "notifications" কিউতে এনকিউ করব।
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("সফলভাবে এনকুইইড হয়েছে: %+v\n", res)

    // "webhooks:sync" একটি টাস্ক তৈরি করব এবং "webhooks" কিউতে এনকিউ করব।
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("সফলভাবে এনকুইইড হয়েছে: %+v\n", res)

    // "images:resize" একটি টাস্ক তৈরি করব এবং "images" কিউতে এনকিউ করব।
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("সফলভাবে এনকুইইড হয়েছে: %+v\n", res)
}

এই প্রোগ্রামটি চালাব এবং তিনটি কিউতে তিনটি টাস্ক এনকিউ করব।

go run client/client.go

এখন, আসা যাক ওয়ার্কারে এই তিনটি টাস্ক প্রসেস করার জন্য। worker.go ফাইলে, আমরা একটি asynq.Server তৈরি করব যাতে এই তিনটি কিউতে থেকে টাস্ক গ্রহণ করতে পারে। একইভাবে, আমরা Redis Cluster এ সংযোগ করার জন্য RedisClusterClientOpt ব্যবহার করব।

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// প্রতিটি কিউতে একই অগ্রাধিকার যোগ করুন
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("সার্ভার চালু করতে ব্যর্থ: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("ইমেইল পাঠানো হচ্ছে %d থেকে %d এক্ষেত্রে\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("ওয়েবহুক টাস্ক হ্যান্ডল করা হচ্ছে: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("চিত্র আকার পরিবর্তন হচ্ছে: %s\n", src)
	return nil
}

আমরা আগে তৈরি তিনটি টাস্ক সম্পর্কে কাজ করার জন্য এই ওয়ার্কার সার্ভার রান করাব।

go run worker/worker.go

আপনি দেখতে পারবেন যে, প্রতিটি হ্যান্ডলার থেকে ম্যাসেজগুলি মুদ্রিত হয়েছে।


ওভারভিউে উল্লিখিত মত এসিংক ডাটা কিউ অনুযায়ী ভাগ করা হয়। একই কিউতে যুক্ত সমস্ত টাস্ক একই রেডিস নোডের ভেতর পড়ে। অতএব, যে রেডিস নোড কোন কিউতে হোস্ট করে?

আমাদের এই প্রশ্নের উত্তর দিতে আমরা CLI ব্যবহার করতে পারি।

asynq queue ls --cluster


এই কমান্ডটি একটি কিউর তালিকা ছাপাবে, যা নিম্নলিখিত সহ প্রিন্ট হবে:

- যে কোন নোডে কিউ অনুযায়ী ক্লাস্টার নোড
- ক্লাস্টার হ্যাশ স্লট, যেখানে কিউ পার্থক্যিক ভাবে ম্যাপ হয়েছে

আউটপুট কিছু একটি দেখতে পারে এরকম:

Queue Cluster KeySlot Cluster Nodes


images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]