Esta página apresenta como usar o Redis Cluster como um corretor de mensagens no Asynq.

Vantagens de Usar o Redis Cluster

Ao usar o Redis Cluster, você pode obter as seguintes vantagens:

  • Facilmente dividir dados entre vários nós Redis
  • Manter a disponibilidade em caso de falhas de determinado nó
  • Realizar failover automaticamente

Visão Geral

Ilustração da Fila do Cluster

O Asynq divide os dados com base em filas. No diagrama acima, temos um Redis Cluster com 6 instâncias (3 mestres e 3 escravos) e 4 filas (q1, q2, q3, q4).

  • Mestre1 (e sua réplica, Escravo1) hospeda q1 e q2.
  • Mestre2 (e sua réplica, Escravo2) hospeda q3.
  • Mestre3 (e sua réplica, Escravo3) hospeda q4.

Quando você usa asynq.Client para enfileirar tarefas, você pode especificar a fila usando a opção Queue. As tarefas enfileiradas serão consumidas por servidor(es) asynq.Server que puxam tarefas dessas filas.

Tutorial

Nesta seção, introduziremos como usar o Redis Cluster como o corretor de mensagens para Asynq. Pressupomos que você tenha um cluster de 6 instâncias do Redis em execução nas portas 7000-7005. Abaixo está um exemplo do arquivo redis.conf:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

Em seguida, vamos criar dois arquivos binários: cliente e trabalhador.

go mod init asynq-redis-cluster-quickstart
mkdir cliente trabalhador
touch cliente/cliente.go trabalhador/trabalhador.go

No arquivo cliente.go, vamos criar um novo asynq.Client e especificar como conectar ao Redis Cluster passando RedisClusterClientOpt.

cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Depois de termos o cliente, criaremos tarefas e as colocaremos em três filas diferentes:

  • notificações
  • webhooks
  • imagens
// cliente.go

pacote principal

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Nomes das filas
const (
    FilaNotificacoes = "notificacoes"
    FilaWebhooks     = "webhooks"
    FilaImagens      = "imagens"
)

func main() {
    cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer cliente.Close()

    // Criar uma tarefa "notificacoes:email" e colocá-la na fila "notificacoes".
    tarefa := asynq.NewTask("notificacoes:email", map[string]interface{}{"para": 123, "de": 456})
    res, err := cliente.Enqueue(tarefa, asynq.Queue(FilaNotificacoes))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Colocado na fila com sucesso: %+v\n", res)

    // Criar uma tarefa "webhooks:sync" e colocá-la na fila "webhooks".
    tarefa = asynq.NewTask("webhooks:sync", map[string]interface{}{"dados": 123})
    res, err = cliente.Enqueue(tarefa, asynq.Queue(FilaWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Colocado na fila com sucesso: %+v\n", res)

    // Criar uma tarefa "imagens:resize" e colocá-la na fila "imagens".
    tarefa = asynq.NewTask("imagens:resize", map[string]interface{}{"src": "algum/caminho/para/imagem"})
    res, err = cliente.Enqueue(tarefa, asynq.Queue(FilaImagens))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Colocado na fila com sucesso: %+v\n", res)
}

Vamos executar este programa para colocar as três tarefas nas filas.

go run cliente/cliente.go

Agora, vamos para o trabalhador para processar essas três tarefas. No arquivo trabalhador.go, vamos criar um asynq.Server para consumir tarefas dessas três filas. Da mesma forma, iremos conectar ao nosso Redis Cluster usando RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Defina a mesma prioridade para cada fila aqui
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Falha ao iniciar o servidor: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("Enviando um e-mail de %d para %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Manuseando a tarefa de Webhook: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Redimensionando a imagem: %s\n", src)
	return nil
}

Vamos executar este servidor de trabalhadores para processar as três tarefas que criamos anteriormente.

go run worker/worker.go

Você deve ser capaz de ver as mensagens impressas de cada manipulador.


Conforme mencionado na visão geral, o Asynq divide os dados com base nas filas. Todas as tarefas adicionadas à mesma fila pertencem ao mesmo nó Redis. Então, qual nó Redis hospeda qual fila?

Podemos usar a CLI para responder a essa pergunta.

asynq queue ls --cluster


Este comando imprimirá uma lista de filas, juntamente com:

- O nó do cluster ao qual a fila pertence
- O slot hash do cluster ao qual a fila está mapeada

A saída pode se parecer com algo assim:

Fila Cluster KeySlot Nós do Cluster


images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]