Diese Seite führt ein, wie man Redis Cluster als Message Broker in Asynq verwendet.
Vorteile der Verwendung von Redis Cluster
Durch die Verwendung von Redis Cluster können Sie die folgenden Vorteile erzielen:
- Einfaches Sharden von Daten über mehrere Redis-Knoten
- Aufrechterhaltung der Verfügbarkeit im Falle bestimmter Knotenausfälle
- Automatisches Failover
Überblick
Asynq sharded Daten auf der Grundlage von Queues. Im obigen Diagramm haben wir einen Redis Cluster mit 6 Instanzen (3 Master und 3 Slaves) und 4 Queues (q1, q2, q3, q4).
- Master1 (und seine Replik, Slave1) hostet q1 und q2.
- Master2 (und seine Replik, Slave2) hostet q3.
- Master3 (und seine Replik, Slave3) hostet q4.
Wenn Sie asynq.Client
verwenden, um Aufgaben in die Warteschlange zu stellen, können Sie die Warteschlange mithilfe der Option "Queue" spezifizieren. Die eingereihten Aufgaben werden von asynq.Server
(s) konsumiert, die Aufgaben aus diesen Warteschlangen ziehen.
Tutorial
In diesem Abschnitt werden wir zeigen, wie man Redis Cluster als Message Broker für Asynq verwendet. Wir gehen davon aus, dass Sie einen Cluster von 6 Redis-Instanzen haben, die auf den Ports 7000-7005 laufen. Nachfolgend finden Sie ein Beispiel der redis.conf
-Datei:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Als nächstes erstellen wir zwei ausführbare Dateien: Client und Worker.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
In client.go
erstellen wir einen neuen asynq.Client
und geben an, wie man sich mit dem Redis Cluster verbindet, indem wir RedisClusterClientOpt
übergeben.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Sobald wir den Client erstellt haben, erstellen wir Aufgaben und stellen sie in drei verschiedenen Warteschlangen ein:
- Benachrichtigungen
- Webhooks
- Bilder
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Namen der Warteschlangen
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Erstellen einer "notifications:email"-Aufgabe und Einreihen in die "notifications"-Warteschlange.
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Erfolgreich eingereiht: %+v\n", res)
// Erstellen einer "webhooks:sync"-Aufgabe und Einreihen in die "webhooks"-Warteschlange.
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Erfolgreich eingereiht: %+v\n", res)
// Erstellen einer "images:resize"-Aufgabe und Einreihen in die "images"-Warteschlange.
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Erfolgreich eingereiht: %+v\n", res)
}
Führen Sie nun dieses Programm aus, um die drei Aufgaben in die Warteschlangen einzufügen.
go run client/client.go
Gehen wir nun zum Worker, um diese drei Aufgaben zu bearbeiten. In worker.go
erstellen wir einen asynq.Server
, um Aufgaben aus diesen drei Warteschlangen zu verarbeiten. Ebenso verbinden wir uns mit unserem Redis-Cluster unter Verwendung von RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Legen Sie hier die gleiche Priorität für jede Warteschlange fest.
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Fehler beim Starten des Servers: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Senden einer E-Mail von %d an %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Behandlung der Webhook-Aufgabe: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Ändern der Bildgröße: %s\n", src)
return nil
}
Führen Sie diesen Worker-Server aus, um die drei zuvor erstellten Aufgaben zu verarbeiten.
go run worker/worker.go
Sie sollten die von jedem Handler gedruckten Nachrichten sehen können.
Wie im Überblick erwähnt, schichtet Asynq Daten basierend auf Warteschlangen. Alle Aufgaben, die zur gleichen Warteschlange hinzugefügt werden, gehören zu demselben Redis-Knoten. Welcher Redis-Knoten hostet also welche Warteschlange?
Wir können die CLI verwenden, um diese Frage zu beantworten.
asynq queue ls --cluster
Dieser Befehl gibt eine Liste der Warteschlangen aus, zusammen mit:
- Dem Clusterknoten, zu dem die Warteschlange gehört
- Dem Cluster-Hash-Slot, auf den die Warteschlange abgebildet ist
Die Ausgabe könnte so aussehen:
Warteschlange Cluster KeySlot Cluster-Knoten
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91867953ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2742e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd64fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f3a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]