Cette page présente comment utiliser Redis Cluster en tant que courtier de messages dans Asynq.

Avantages de l'utilisation de Redis Cluster

En utilisant Redis Cluster, vous pouvez bénéficier des avantages suivants :

  • Fragmentation facile des données sur plusieurs nœuds Redis
  • Maintien de la disponibilité en cas de défaillance de certains nœuds
  • Exécution automatique de la bascule

Aperçu

Illustration de la file d'attente du cluster

Asynq fragmente les données en fonction des files d'attente. Dans le diagramme ci-dessus, nous avons un Redis Cluster avec 6 instances (3 maîtres et 3 esclaves) et 4 files d'attente (q1, q2, q3, q4).

  • Le Maître1 (et sa réplique, Esclave1) héberge q1 et q2.
  • Le Maître2 (et sa réplique, Esclave2) héberge q3.
  • Le Maître3 (et sa réplique, Esclave3) héberge q4.

Lorsque vous utilisez asynq.Client pour mettre en file d'attente des tâches, vous pouvez spécifier la file d'attente en utilisant l'option Queue. Les tâches mises en file d'attente seront consommées par le(s) asynq.Server qui extrait des tâches de ces files d'attente.

Tutoriel

Dans cette section, nous allons présenter comment utiliser Redis Cluster comme courtier de messages pour Asynq. Nous supposons que vous avez un cluster de 6 instances Redis en cours d'exécution sur les ports 7000-7005. Ci-dessous un exemple du fichier redis.conf :

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

Ensuite, nous allons créer deux fichiers binaires : client et worker.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

Dans client.go, nous allons créer un nouveau asynq.Client et spécifier comment se connecter au Redis Cluster en passant RedisClusterClientOpt.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Une fois que nous avons le client, nous allons créer des tâches et les mettre en file d'attente dans trois files différentes :

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Noms de file d'attente
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // Créer une tâche "notifications:email" et la mettre en file d'attente dans la file "notifications".
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Mis en file d'attente avec succès : %+v\n", res)

    // Créer une tâche "webhooks:sync" et la mettre en file d'attente dans la file "webhooks".
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Mis en file d'attente avec succès : %+v\n", res)

    // Créer une tâche "images:resize" et la mettre en file d'attente dans la file "images".
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Mis en file d'attente avec succès : %+v\n", res)
}

Exécutons ce programme pour mettre en file d'attente les trois tâches dans les files.

go run client/client.go

Maintenant, allons au worker pour traiter ces trois tâches. Dans worker.go, nous allons créer un asynq.Server pour consommer les tâches de ces trois files. De même, nous nous connecterons à notre cluster Redis en utilisant RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Définissez la même priorité pour chaque file d'attente ici
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Échec du démarrage du serveur : %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("Envoi d'un e-mail de %d à %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Gestion de la tâche de webhook : %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Redimensionnement de l'image : %s\n", src)
	return nil
}

Exécutons ce serveur worker pour gérer les trois tâches que nous avons créées précédemment.

go run worker/worker.go

Vous devriez pouvoir voir les messages imprimés par chaque gestionnaire.


Comme mentionné dans la vue d'ensemble, Asynq compartimente les données en fonction des files d'attente. Toutes les tâches ajoutées à la même file d'attente appartiennent au même nœud Redis. Ainsi, quel nœud Redis héberge quelle file d'attente ?

Nous pouvons utiliser l'interface de ligne de commande (CLI) pour répondre à cette question.

asynq queue ls --cluster


Cette commande affichera une liste de files d'attente, avec :

- Le nœud de cluster auquel la file d'attente appartient
- La plage de hachage de cluster à laquelle la file d'attente est mappée

La sortie pourrait ressembler à ceci :

Queue Cluster KeySlot Cluster Nodes


images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]