Esta página presenta cómo utilizar Redis Cluster como un intermediario de mensajes en Asynq.

Ventajas de usar Redis Cluster

Al utilizar Redis Cluster, puedes obtener las siguientes ventajas:

  • Facilidad para fragmentar datos en varios nodos de Redis
  • Mantener la disponibilidad en caso de fallos de ciertos nodos
  • Realizar automáticamente failover

Resumen

Ilustración de la cola del clúster

Asynq fragmenta los datos en base a colas. En el diagrama anterior, tenemos un clúster de Redis con 6 instancias (3 maestros y 3 esclavos) y 4 colas (q1, q2, q3, q4).

  • El Maestro1 (y su réplica, Esclavo1) albergan q1 y q2.
  • El Maestro2 (y su réplica, Esclavo2) albergan q3.
  • El Maestro3 (y su réplica, Esclavo3) albergan q4.

Cuando utilizas asynq.Client para encolar tareas, puedes especificar la cola utilizando la opción Queue. Las tareas encoladas serán consumidas por el/los asynq.Server que toman tareas de estas colas.

Tutorial

En esta sección, vamos a introducir cómo usar Redis Cluster como el intermediario de mensajes para Asynq. Suponemos que tienes un clúster de 6 instancias de Redis ejecutándose en los puertos 7000-7005. A continuación se muestra un ejemplo del archivo redis.conf:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

A continuación, crearemos dos archivos binarios: cliente y trabajador.

go mod init asynq-redis-cluster-quickstart
mkdir cliente trabajador
touch cliente/cliente.go trabajador/trabajador.go

En cliente.go, crearemos un nuevo asynq.Client y especificaremos cómo conectarnos al clúster de Redis pasando RedisClusterClientOpt.

cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Una vez que tengamos el cliente, crearemos tareas y las encolaremos en tres colas diferentes:

  • notificaciones
  • webhooks
  • imágenes
// cliente.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Nombres de las colas
const (
    ColaNotificaciones = "notificaciones"
    ColaWebhooks       = "webhooks"
    ColaImagenes       = "imágenes"
)

func main() {
    cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer cliente.Close()

    // Crea una tarea "notificaciones:email" y encola en la cola "notificaciones".
    tarea := asynq.NewTask("notificaciones:email", map[string]interface{}{"para": 123, "de": 456})
    res, err := cliente.Enqueue(tarea, asynq.Queue(ColaNotificaciones))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Encolado exitosamente: %+v\n", res)

    // Crea una tarea "webhooks:sync" y encola en la cola "webhooks".
    tarea = asynq.NewTask("webhooks:sync", map[string]interface{}{"datos": 123})
    res, err = cliente.Enqueue(tarea, asynq.Queue(ColaWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Encolado exitosamente: %+v\n", res)

    // Crea una tarea "imágenes:resize" y encola en la cola "imágenes".
    tarea = asynq.NewTask("imágenes:resize", map[string]interface{}{"src": "alguna/ruta/a/imagen"})
    res, err = cliente.Enqueue(tarea, asynq.Queue(ColaImagenes))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Encolado exitosamente: %+v\n", res)
}

Ejecutemos este programa para encolar las tres tareas en las colas.

go run cliente/cliente.go

Ahora, vamos al trabajador para procesar estas tres tareas. En trabajador.go, crearemos un asynq.Server para consumir tareas de estas tres colas. De manera similar, nos conectaremos a nuestro clúster de Redis usando RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Establecer la misma prioridad para cada cola aquí
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Error al iniciar el servidor: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("Enviando un correo electrónico desde %d a %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Manejando la tarea de Webhook: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Redimensionando la imagen: %s\n", src)
	return nil
}

Para ejecutar este servidor de trabajador y manejar las tres tareas que creamos anteriormente, use el siguiente comando.

go run worker/worker.go

Debería poder ver los mensajes impresos desde cada controlador.

Como se mencionó en la descripción general, Asynq divide los datos según las colas. Todas las tareas añadidas a la misma cola pertenecen al mismo nodo de Redis. Entonces, ¿qué nodo de Redis aloja qué cola?

Podemos usar la CLI para responder a esta pregunta.

asynq queue ls --cluster

Este comando imprimirá una lista de colas, junto con:

  • El nodo del clúster al que pertenece la cola
  • El hash slot del clúster al que está asignada la cola

La salida podría ser algo como esto:

Cola          Slot clave del clúster       Nodos del clúster
-----          ---------------           -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]