This page introduces how to use Redis Cluster as a message broker in Asynq.
Advantages of Using Redis Cluster
By using Redis Cluster, you can gain the following advantages:
- Easily shard data across multiple Redis nodes
- Maintain availability in the event of certain node failures
- Automatically perform failover
Overview
Asynq shards data based on queues. In the diagram above, we have a Redis Cluster with 6 instances (3 masters and 3 slaves) and 4 queues (q1, q2, q3, q4).
- Master1 (and its replica, Slave1) hosts q1 and q2.
- Master2 (and its replica, Slave2) hosts q3.
- Master3 (and its replica, Slave3) hosts q4.
When you use asynq.Client
to enqueue tasks, you can specify the queue using the Queue
option. The tasks enqueued will be consumed by asynq.Server
(s) pulling tasks from these queues.
Tutorial
In this section, we will introduce how to use Redis Cluster as the message broker for Asynq. We assume that you have a cluster of 6 Redis instances running on ports 7000-7005. Below is an example of the redis.conf
file:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
Next, we will create two binary files: client and worker.
go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go
In client.go
, we will create a new asynq.Client
and specify how to connect to the Redis Cluster by passing RedisClusterClientOpt
.
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Once we have the client, we will create tasks and enqueue them into three different queues:
- notifications
- webhooks
- images
// client.go
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Queue names
const (
QueueNotifications = "notifications"
QueueWebhooks = "webhooks"
QueueImages = "images"
)
func main() {
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
defer client.Close()
// Create a "notifications:email" task and enqueue it into the "notifications" queue.
task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Successfully enqueued: %+v\n", res)
// Create a "webhooks:sync" task and enqueue it into the "webhooks" queue.
task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Successfully enqueued: %+v\n", res)
// Create a "images:resize" task and enqueue it into the "images" queue.
task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
res, err = client.Enqueue(task, asynq.Queue(QueueImages))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Successfully enqueued: %+v\n", res)
}
Let's run this program to enqueue the three tasks into the queues.
go run client/client.go
Now, let's go to the worker to process these three tasks. In worker.go
, we will create an asynq.Server
to consume tasks from these three queues. Similarly, we will connect to our Redis Cluster using RedisClusterClientOpt
.
// worker.go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 20,
// Set the same priority for each queue here
Queues: map[string]int{
"notifications": 1,
"webhooks": 1,
"images": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc("notifications:email", handleEmailTask)
mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
mux.HandleFunc("images:resize", handleImageResizeTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Failed to start the server: %v", err)
}
}
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
to, err := t.Payload.GetInt("to")
if err != nil {
return err
}
from, err := t.Payload.GetInt("from")
if err != nil {
return err
}
fmt.Printf("Sending an email from %d to %d\n", from, to)
return nil
}
func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
data, err := t.Payload.GetInt("data")
if err != nil {
return err
}
fmt.Printf("Handling Webhook task: %d\n", data)
return nil
}
func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
fmt.Printf("Resizing the image: %s\n", src)
return nil
}
Let's run this worker server to handle the three tasks we created earlier.
go run worker/worker.go
You should be able to see the messages printed from each handler.
### Redis Node and Queue
As mentioned in the overview, Asynq shards data based on queues. All tasks added to the same queue belong to the same Redis node. So, which Redis node hosts which queue?
We can use the CLI to answer this question.
asynq queue ls --cluster
This command will print a list of queues, along with:
- The cluster node to which the queue belongs
- The cluster hash slot to which the queue is mapped
The output might look something like this:
Queue Cluster KeySlot Cluster Nodes
images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]
You can use the command `redis-cli --cluster reshard` to move a queue from one node to another. Please note that during the resharding process, certain operations may be unavailable for a period of time due to Asynq's use of multi-key operations.