Questa pagina introduce come utilizzare Redis Cluster come message broker in Asynq.
Vantaggi dell'Utilizzo di Redis Cluster
Utilizzando Redis Cluster, è possibile ottenere i seguenti vantaggi:
- Suddividere facilmente i dati su più nodi Redis
- Mantenere la disponibilità in caso di guasti di determinati nodi
- Eseguire automaticamente il failover
Panoramica
Asynq suddivide i dati in base alle code. Nel diagramma sopra, abbiamo un Redis Cluster con 6 istanze (3 master e 3 slave) e 4 code (q1, q2, q3, q4).
- Master1 (e il suo replica, Slave1) ospita q1 e q2.
- Master2 (e il suo replica, Slave2) ospita q3.
- Master3 (e il suo replica, Slave3) ospita q4.
Quando si utilizza asynq.Client
per accodare compiti, è possibile specificare la coda utilizzando l'opzione Queue
. I compiti accodati verranno consumati da asynq.Server
(s) che estraggono i compiti da queste code.
Tutorial
In questa sezione, introdurremo come utilizzare Redis Cluster come message broker per Asynq. Supponiamo che tu abbia un cluster di 6 istanze di Redis in esecuzione sulle porte 7000-7005. Di seguito è riportato un esempio del file redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Successivamente, creeremo due file binari: client e worker.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
In client.go
, creeremo un nuovo asynq.Client
e specificare come connettersi al Redis Cluster passando RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Una volta ottenuto il client, creeremo dei task e li metteremo in coda in tre code diverse:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Nomi delle code
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Creare un task "notifications:email" e metterlo in coda nella coda "notifications".
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("In coda con successo: %+v\n", res)
// Creare un task "webhooks:sync" e metterlo in coda nella coda "webhooks".
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("In coda con successo: %+v\n", res)
// Creare un task "images:resize" e metterlo in coda nella coda "images".
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("In coda con successo: %+v\n", res)
}
Eseguiamo questo programma per mettere in coda i tre task nelle code.
go run client/client.go
Ora, passiamo al worker per elaborare questi tre task. In worker.go
, creeremo un asynq.Server
per consumare i task da queste tre code. Allo stesso modo, ci collegheremo al nostro Redis Cluster usando RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Imposta la stessa priorità per ciascuna coda qui
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Impossibile avviare il server: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Invio di una email da %d a %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Gestione del compito di Webhook: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Ridimensionamento dell'immagine: %s\n", src)
return nil
}
Eseguiamo questo server worker per gestire i tre compiti che abbiamo creato in precedenza.
go run worker/worker.go
Dovresti essere in grado di vedere i messaggi stampati da ciascun gestore.
Come menzionato nella panoramica, Asynq frammenta i dati in base alle code. Tutti i compiti aggiunti alla stessa coda appartengono allo stesso nodo Redis. Quindi, quale nodo Redis ospita quale coda?
Possiamo utilizzare la CLI per rispondere a questa domanda.
asynq queue ls --cluster
Questo comando stamperà un elenco di code, insieme a:
- Il nodo del cluster a cui appartiene la coda
- Lo slot hash del cluster a cui è associata la coda
L'output potrebbe assomigliare a questo:
Queue Chiave slot del cluster Nodi del cluster
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]