Эта страница представляет собой руководство по использованию Redis Cluster в качестве посредника сообщений в Asynq.
Преимущества использования Redis Cluster
Используя Redis Cluster, вы получаете следующие преимущества:
- Легко распределять данные по нескольким узлам Redis
- Сохранение доступности в случае отказа определенных узлов
- Автоматически выполнять отказоустойчивость
Обзор
Asynq распределяет данные на основе очередей. На диаграмме выше показан Redis Cluster с 6 экземплярами (3 мастера и 3 реплики) и 4 очередями (q1, q2, q3, q4).
- Мастер1 (и его реплика, Slave1) размещает q1 и q2.
- Мастер2 (и его реплика, Slave2) размещает q3.
- Мастер3 (и его реплика, Slave3) размещает q4.
При использовании asynq.Client
для постановки задач в очередь, можно указать очередь с помощью параметра Queue
. Задачи, поставленные в очередь, будут обрабатываться сервером asynq.Server
, который забирает задачи из этих очередей.
Руководство
В этом разделе мы расскажем, как использовать кластер Redis в качестве посредника сообщений для Asynq. Мы предполагаем, что у вас есть кластер из 6 экземпляров Redis, работающих на портах 7000-7005. Ниже приведен пример файла redis.conf
:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Затем мы создадим два исполняемых файла: клиент и воркер.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
В файле client.go
мы создадим новый asynq.Client
и укажем, как подключаться к кластеру Redis, передавая RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
После создания клиента мы будем создавать задачи и помещать их в три разные очереди:
- уведомления
- вебхуки
- изображения
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Имена очередей
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Создаем задачу "notifications:email" и помещаем ее в очередь "notifications".
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Успешно помещено в очередь: %+v\n", res)
// Создаем задачу "webhooks:sync" и помещаем ее в очередь "webhooks".
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Успешно помещено в очередь: %+v\n", res)
// Создаем задачу "images:resize" и помещаем ее в очередь "images".
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Успешно помещено в очередь: %+v\n", res)
}
Теперь запустим эту программу, чтобы поместить три задачи в очереди.
go run client/client.go
Теперь перейдем к воркеру, чтобы обработать эти три задачи. В файле worker.go
мы создадим asynq.Server
, чтобы потреблять задачи из этих трех очередей. Аналогично, мы подключимся к нашему кластеру Redis, используя RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Установите здесь одинаковый приоритет для каждой очереди
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Не удалось запустить сервер: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Отправка письма с %d на %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Обработка задачи вебхука: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Изменение размера изображения: %s\n", src)
return nil
}
Давайте запустим этот рабочий сервер для обработки трех задач, которые мы создали ранее.
go run worker/worker.go
Вы должны увидеть сообщения, напечатанные из каждого обработчика.
Как упоминалось в обзоре, Asynq разбивает данные на основе очередей. Все задачи, добавленные в одну очередь, принадлежат одному узлу Redis. Так, какой узел Redis хранит какую очередь?
Мы можем использовать интерфейс командной строки (CLI), чтобы ответить на этот вопрос.
asynq queue ls --cluster
Эта команда выведет список очередей вместе с:
- Узел кластера, к которому принадлежит очередь
- Хэш-слот кластера, к которому отображается очередь
Результат может выглядеть примерно так:
Queue Cluster KeySlot Cluster Nodes
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]