This page introduces the task aggregation feature of Asynq.

Overview

Task aggregation allows you to queue multiple tasks in succession, rather than passing them one by one to a "Handler". This feature enables you to batch multiple consecutive operations into one, saving costs, optimizing caching, or batch notifications.

Working Principle

To use the task aggregation feature, you need to queue tasks with the same group name into the same queue. The tasks queued using the same (queue, group) pair will be aggregated into one task by the GroupAggregator you provide, and the aggregated task will be passed to the handler.

When creating an aggregated task, the Asynq server will wait for more tasks until the configurable grace period expires. Each time a new task is queued with the same (queue, group), the grace period is updated.

The grace period has a configurable upper limit: you can set the maximum aggregation delay time, after which the Asynq server will ignore the remaining grace period and aggregate the tasks.

You can also set the maximum number of tasks that can be aggregated together. If this number is reached, the Asynq server will immediately aggregate the tasks.

Note: Task scheduling and aggregation are conflicting features, with scheduling taking precedence over aggregation.

Quick Example

On the client side, use the Queue and Group options to enqueue tasks into the same group.

// Enqueue three tasks to the same group.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

On the server side, provide a GroupAggregator to enable task aggregation. You can customize the aggregation strategy by configuring GroupGracePeriod, GroupMaxDelay, and GroupMaxSize.

// This function is used to aggregate multiple tasks into one task.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Your logic to aggregate the given tasks and return the aggregated task.
    // ... If necessary, use NewTask(typename, payload, opts...) to create a new task and set options.
    // ... (Note) the Queue option will be ignored, and the aggregated task will always be queued to the same queue as the group.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Tutorial

In this section, we provide a simple program to demonstrate the use of the aggregation feature.

First, create a client program with the following code:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
       flagMessage = flag.String("message", "hello", "Message to be printed when processing the task")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Failed to enqueue task: %v", err)
        }
        log.Printf("Successfully enqueued task: %s", info.ID)
}

You can run this program multiple times:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Now, if you check the queue through CLI or Web UI, you will see tasks being aggregated in the queue.

Next, create a server program with the following code:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Address of the Redis server")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Grace period for groups")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Maximum delay for groups")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Maximum size for groups")
)

// Simple aggregation function.
// Combines messages of all tasks into one, with each message taking up one line.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Aggregated %d tasks from group %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Handler received aggregated task")
	log.Printf("Aggregated message: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Failed to start the server: %v", err)
	}
}

You can run this program and observe the output:

$ go build -o server server.go
$ ./server --redis-addr=

You should be able to see in the output that the server has aggregated tasks in the group and the processor has processed the aggregated tasks. Feel free to try changing the --grace-period, --max-delay, and --max-size flags in the above program to see how they affect the aggregation strategy.