Ta strona przedstawia, jak korzystać z klastra Redis jako brokera wiadomości w Asynq.
Zalety korzystania z klastra Redis
Korzystając z klastra Redis, można uzyskać następujące korzyści:
- Łatwe rozmieszczenie danych na wielu węzłach Redis
- Utrzymanie dostępności w przypadku awarii określonych węzłów
- Automatyczne wykonywanie awaryjnego przejęcia
Przegląd
Asynq rozdziela dane na podstawie kolejek. Na powyższym diagramie mamy klaster Redis z 6 instancjami (3 masterami i 3 slave'ami) oraz 4 kolejkami (q1, q2, q3, q4).
- Master1 (oraz jego replika, Slave1) obsługuje q1 i q2.
- Master2 (oraz jego replika, Slave2) obsługuje q3.
- Master3 (oraz jego replika, Slave3) obsługuje q4.
Kiedy korzystasz z asynq.Client
do umieszczania zadań w kolejce, możesz określić kolejkę za pomocą opcji Queue
. Umieszczone zadania będą pobierane przez asynq.Server
(y) z tych kolejek.
Samouczek
W tej sekcji przedstawimy, jak używać klastra Redis jako brokera wiadomości dla Asynq. Zakładamy, że masz klaster 6 instancji Redis uruchomiony na portach 7000-7005. Poniżej znajduje się przykład pliku redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Następnie utworzymy dwie pliki binarne: klient i pracownik.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
W client.go
utworzymy nowego klienta asynq.Client
i określimy, jak połączyć się z klastrem Redis, przekazując RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Gdy już mamy klienta, utworzymy zadania i umieścimy je w trzech różnych kolejkach:
- powiadomienia
- webhooks
- obrazy
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Nazwy kolejek
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Utwórz zadanie "notifications:email" i umieść je w kolejce "notifications".
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)
// Utwórz zadanie "webhooks:sync" i umieść je w kolejce "webhooks".
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)
// Utwórz zadanie "images:resize" i umieść je w kolejce "images".
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)
}
Uruchommy ten program, aby umieścić trzy zadania w kolejkach.
go run client/client.go
Teraz przejdźmy do pracownika, który przetworzy te trzy zadania. W worker.go
utworzymy asynq.Server
, aby pobierać zadania z tych trzech kolejek. Podobnie, połączmy się z naszym klastrem Redis, używając RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Tutaj ustawiamy tę samą priorytet dla każej kolejki
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", obsłużZadanieEmail)
mux.HandleFunc("webhooks:sync", obsłużZadanieSynchronizacjiWebhook)
mux.HandleFunc("images:resize", obsłużZmianęRozmiaruObrazu)
if err := srv.Run(mux); err != nil {
log.Fatalf("Nie udało się uruchomić serwera: %v", err)
}
}
func obsłużZadanieEmail(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Wysyłanie e-maila od %d do %d\n", from, to)
return nil
}
func obsłużZadanieSynchronizacjiWebhook(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Obsługa zadania Webhook: %d\n", data)
return nil
}
func obsłużZmianęRozmiaruObrazu(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Zmiana rozmiaru obrazu: %s\n", src)
return nil
}
Uruchommy ten serwer w celu obsłużenia trzech zadań, które wcześniej utworzyliśmy.
go run worker/worker.go
Powinieneś zobaczyć wiadomości wydrukowane z każdego handlera.
Zgodnie z opisem ogólnym, Asynq dzieli dane na podstawie kolejek. Wszystkie zadania dodane do tej samej kolejki należą do tego samego węzła Redis. Więc który węzeł Redis przechowuje których kolejkę?
Możemy użyć interfejsu wiersza poleceń, aby odpowiedzieć na to pytanie.
asynq queue ls --cluster
To polecenie wydrukuje listę kolejek wraz z:
- Węzłem klastra, do którego kolejka należy
- Slotem haszującym klastra, do którego kolejka jest mapowana
Wyjście może wyglądać tak:
Kolejka Klucz slotu klastra Węzły klastra
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]