Esta página apresenta como usar o Redis Cluster como um corretor de mensagens no Asynq.
Vantagens de Usar o Redis Cluster
Ao usar o Redis Cluster, você pode obter as seguintes vantagens:
- Facilmente dividir dados entre vários nós Redis
- Manter a disponibilidade em caso de falhas de determinado nó
- Realizar failover automaticamente
Visão Geral
O Asynq divide os dados com base em filas. No diagrama acima, temos um Redis Cluster com 6 instâncias (3 mestres e 3 escravos) e 4 filas (q1, q2, q3, q4).
- Mestre1 (e sua réplica, Escravo1) hospeda q1 e q2.
- Mestre2 (e sua réplica, Escravo2) hospeda q3.
- Mestre3 (e sua réplica, Escravo3) hospeda q4.
Quando você usa asynq.Client
para enfileirar tarefas, você pode especificar a fila usando a opção Queue
. As tarefas enfileiradas serão consumidas por servidor(es) asynq.Server
que puxam tarefas dessas filas.
Tutorial
Nesta seção, introduziremos como usar o Redis Cluster como o corretor de mensagens para Asynq. Pressupomos que você tenha um cluster de 6 instâncias do Redis em execução nas portas 7000-7005. Abaixo está um exemplo do arquivo redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Em seguida, vamos criar dois arquivos binários: cliente e trabalhador.
go mod init asynq-redis-cluster-quickstart
mkdir cliente trabalhador
touch cliente/cliente.go trabalhador/trabalhador.go
No arquivo cliente.go
, vamos criar um novo asynq.Client
e especificar como conectar ao Redis Cluster passando RedisClusterClientOpt
.
cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Depois de termos o cliente, criaremos tarefas e as colocaremos em três filas diferentes:
- notificações
- webhooks
- imagens
// cliente.go
pacote principal
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Nomes das filas
const (
FilaNotificacoes = "notificacoes"
FilaWebhooks = "webhooks"
FilaImagens = "imagens"
)
func main() {
cliente := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer cliente.Close()
// Criar uma tarefa "notificacoes:email" e colocá-la na fila "notificacoes".
tarefa := asynq.NewTask("notificacoes:email", map[string]interface{}{"para": 123, "de": 456})
res, err := cliente.Enqueue(tarefa, asynq.Queue(FilaNotificacoes))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Colocado na fila com sucesso: %+v\n", res)
// Criar uma tarefa "webhooks:sync" e colocá-la na fila "webhooks".
tarefa = asynq.NewTask("webhooks:sync", map[string]interface{}{"dados": 123})
res, err = cliente.Enqueue(tarefa, asynq.Queue(FilaWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Colocado na fila com sucesso: %+v\n", res)
// Criar uma tarefa "imagens:resize" e colocá-la na fila "imagens".
tarefa = asynq.NewTask("imagens:resize", map[string]interface{}{"src": "algum/caminho/para/imagem"})
res, err = cliente.Enqueue(tarefa, asynq.Queue(FilaImagens))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Colocado na fila com sucesso: %+v\n", res)
}
Vamos executar este programa para colocar as três tarefas nas filas.
go run cliente/cliente.go
Agora, vamos para o trabalhador para processar essas três tarefas. No arquivo trabalhador.go
, vamos criar um asynq.Server
para consumir tarefas dessas três filas. Da mesma forma, iremos conectar ao nosso Redis Cluster usando RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Defina a mesma prioridade para cada fila aqui
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Falha ao iniciar o servidor: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Enviando um e-mail de %d para %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Manuseando a tarefa de Webhook: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Redimensionando a imagem: %s\n", src)
return nil
}
Vamos executar este servidor de trabalhadores para processar as três tarefas que criamos anteriormente.
go run worker/worker.go
Você deve ser capaz de ver as mensagens impressas de cada manipulador.
Conforme mencionado na visão geral, o Asynq divide os dados com base nas filas. Todas as tarefas adicionadas à mesma fila pertencem ao mesmo nó Redis. Então, qual nó Redis hospeda qual fila?
Podemos usar a CLI para responder a essa pergunta.
asynq queue ls --cluster
Este comando imprimirá uma lista de filas, juntamente com:
- O nó do cluster ao qual a fila pertence
- O slot hash do cluster ao qual a fila está mapeada
A saída pode se parecer com algo assim:
Fila Cluster KeySlot Nós do Cluster
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]