Esta página presenta cómo utilizar Redis Cluster como un intermediario de mensajes en Asynq.
Ventajas de usar Redis Cluster
Al utilizar Redis Cluster, puedes obtener las siguientes ventajas:
- Facilidad para fragmentar datos en varios nodos de Redis
- Mantener la disponibilidad en caso de fallos de ciertos nodos
- Realizar automáticamente failover
Resumen
Asynq fragmenta los datos en base a colas. En el diagrama anterior, tenemos un clúster de Redis con 6 instancias (3 maestros y 3 esclavos) y 4 colas (q1, q2, q3, q4).
- El Maestro1 (y su réplica, Esclavo1) albergan q1 y q2.
- El Maestro2 (y su réplica, Esclavo2) albergan q3.
- El Maestro3 (y su réplica, Esclavo3) albergan q4.
Cuando utilizas asynq.Client
para encolar tareas, puedes especificar la cola utilizando la opción Queue
. Las tareas encoladas serán consumidas por el/los asynq.Server
que toman tareas de estas colas.
Tutorial
En esta sección, vamos a introducir cómo usar Redis Cluster como el intermediario de mensajes para Asynq. Suponemos que tienes un clúster de 6 instancias de Redis ejecutándose en los puertos 7000-7005. A continuación se muestra un ejemplo del archivo redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
A continuación, crearemos dos archivos binarios: cliente y trabajador.
go mod init asynq-redis-cluster-quickstart
mkdir cliente trabajador
touch cliente/cliente.go trabajador/trabajador.go
En cliente.go
, crearemos un nuevo asynq.Client
y especificaremos cómo conectarnos al clúster de Redis pasando RedisClusterClientOpt
.
cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Una vez que tengamos el cliente, crearemos tareas y las encolaremos en tres colas diferentes:
- notificaciones
- webhooks
- imágenes
// cliente.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Nombres de las colas
const (
ColaNotificaciones = "notificaciones"
ColaWebhooks = "webhooks"
ColaImagenes = "imágenes"
)
func main() {
cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer cliente.Close()
// Crea una tarea "notificaciones:email" y encola en la cola "notificaciones".
tarea := asynq.NewTask("notificaciones:email", map[string]interface{}{"para": 123, "de": 456})
res, err := cliente.Enqueue(tarea, asynq.Queue(ColaNotificaciones))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Encolado exitosamente: %+v\n", res)
// Crea una tarea "webhooks:sync" y encola en la cola "webhooks".
tarea = asynq.NewTask("webhooks:sync", map[string]interface{}{"datos": 123})
res, err = cliente.Enqueue(tarea, asynq.Queue(ColaWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Encolado exitosamente: %+v\n", res)
// Crea una tarea "imágenes:resize" y encola en la cola "imágenes".
tarea = asynq.NewTask("imágenes:resize", map[string]interface{}{"src": "alguna/ruta/a/imagen"})
res, err = cliente.Enqueue(tarea, asynq.Queue(ColaImagenes))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Encolado exitosamente: %+v\n", res)
}
Ejecutemos este programa para encolar las tres tareas en las colas.
go run cliente/cliente.go
Ahora, vamos al trabajador para procesar estas tres tareas. En trabajador.go
, crearemos un asynq.Server
para consumir tareas de estas tres colas. De manera similar, nos conectaremos a nuestro clúster de Redis usando RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Establecer la misma prioridad para cada cola aquí
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Error al iniciar el servidor: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Enviando un correo electrónico desde %d a %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Manejando la tarea de Webhook: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Redimensionando la imagen: %s\n", src)
return nil
}
Para ejecutar este servidor de trabajador y manejar las tres tareas que creamos anteriormente, use el siguiente comando.
go run worker/worker.go
Debería poder ver los mensajes impresos desde cada controlador.
Como se mencionó en la descripción general, Asynq divide los datos según las colas. Todas las tareas añadidas a la misma cola pertenecen al mismo nodo de Redis. Entonces, ¿qué nodo de Redis aloja qué cola?
Podemos usar la CLI para responder a esta pregunta.
asynq queue ls --cluster
Este comando imprimirá una lista de colas, junto con:
- El nodo del clúster al que pertenece la cola
- El hash slot del clúster al que está asignada la cola
La salida podría ser algo como esto:
Cola Slot clave del clúster Nodos del clúster
----- --------------- -------------
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]