Halaman ini memperkenalkan cara menggunakan Redis Cluster sebagai penengah pesan dalam Asynq.
Keuntungan Menggunakan Redis Cluster
Dengan menggunakan Redis Cluster, Anda dapat memperoleh keuntungan berikut:
- Mudah membagi data di beberapa node Redis
- Memelihara ketersediaan dalam kejadian kegagalan node tertentu
- Melakukan failover secara otomatis
Gambaran Besar
Asynq membagi data berdasarkan antrian. Pada diagram di atas, kami memiliki Redis Cluster dengan 6 instansi (3 master dan 3 slave) dan 4 antrian (q1, q2, q3, q4).
- Master1 (dan replikanya, Slave1) meng-host q1 dan q2.
- Master2 (dan replikanya, Slave2) meng-host q3.
- Master3 (dan replikanya, Slave3) meng-host q4.
Ketika Anda menggunakan asynq.Client
untuk memasukkan tugas, Anda dapat menentukan antrian menggunakan opsi Queue
. Tugas-tugas yang dimasukkan akan dikonsumsi oleh asynq.Server
(s) menarik tugas dari antrian-antrian ini.
Panduan
Pada bagian ini, kami akan memperkenalkan cara menggunakan Redis Cluster sebagai pialang pesan untuk Asynq. Kami mengasumsikan bahwa Anda memiliki sebuah klaster 6 instansi Redis yang berjalan pada port 7000-7005. Di bawah ini adalah contoh file redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Selanjutnya, kami akan membuat dua berkas biner: client dan worker.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
Pada client.go
, kita akan membuat asynq.Client
baru dan menentukan cara untuk terhubung ke Redis Cluster dengan meneruskan RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Setelah kita memiliki client, kita akan membuat tugas dan memasukkannya ke dalam tiga antrian yang berbeda:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Nama antrian
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Membuat tugas "notifications:email" dan memasukkannya ke dalam antrian "notifications".
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Berhasil dimasukkan ke dalam antrian: %+v\n", res)
// Membuat tugas "webhooks:sync" dan memasukkannya ke dalam antrian "webhooks".
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Berhasil dimasukkan ke dalam antrian: %+v\n", res)
// Membuat tugas "images:resize" dan memasukkannya ke dalam antrian "images".
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Berhasil dimasukkan ke dalam antrian: %+v\n", res)
}
Mari jalankan program ini untuk memasukkan tiga tugas ke dalam antrian.
go run client/client.go
Sekarang, mari beralih ke pekerja untuk memproses tiga tugas ini. Pada worker.go
, kita akan membuat sebuah asynq.Server
untuk mengonsumsi tugas dari ketiga antrian ini. Secara serupa, kita akan terhubung ke Redis Cluster kita menggunakan RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Tetapkan prioritas yang sama untuk setiap antrian di sini
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Gagal memulai server: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Mengirim email dari %d ke %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Menangani tugas Webhook: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Mengubah ukuran gambar: %s\n", src)
return nil
}
Mari jalankan server pekerja ini untuk menangani tiga tugas yang telah kita buat sebelumnya.
go run worker/worker.go
Anda seharusnya dapat melihat pesan yang dicetak dari setiap penangan.
Seperti yang disebutkan pada gambaran, Asynq membagi data berdasarkan antrian. Semua tugas yang ditambahkan ke antrian yang sama dimiliki oleh node Redis yang sama. Jadi, node Redis mana yang menyimpan antrian mana?
Kita dapat menggunakan CLI untuk menjawab pertanyaan ini.
asynq queue ls --cluster
Perintah ini akan mencetak daftar antrian, bersama dengan:
- Node klaster tempat antrian tersebut dimiliki
- Slot hash klaster tempat antrian tersebut dipetakan
Output mungkin terlihat seperti ini:
Queue Cluster KeySlot Cluster Nodes
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]