Esta página apresenta a funcionalidade de agregação de tarefas do Asynq.

Visão geral

A agregação de tarefas permite enfileirar várias tarefas sucessivamente, em vez de passá-las uma por uma para um "Handler". Esta funcionalidade permite agrupar várias operações consecutivas em uma só, economizando custos, otimizando o caching ou notificações em lote.

Princípio de funcionamento

Para usar a funcionalidade de agregação de tarefas, é necessário enfileirar as tarefas com o mesmo nome do grupo na mesma fila. As tarefas enfileiradas usando o mesmo par (fila, grupo) serão agregadas em uma única tarefa pelo GroupAggregator que você fornecer, e a tarefa agregada será encaminhada para o manipulador.

Ao criar uma tarefa agregada, o servidor Asynq aguardará mais tarefas até que o período de tolerância configurável expire. Cada vez que uma nova tarefa é enfileirada com o mesmo (fila, grupo), o período de tolerância é atualizado.

O período de tolerância tem um limite superior configurável: é possível definir o tempo máximo de atraso de agregação, após o qual o servidor Asynq ignorará o restante do período de tolerância e agregará as tarefas.

Também é possível definir o número máximo de tarefas que podem ser agregadas juntas. Se esse número for atingido, o servidor Asynq agregará imediatamente as tarefas.

Nota: O agendamento e a agregação de tarefas são funcionalidades conflitantes, com o agendamento tendo precedência sobre a agregação.

Exemplo Rápido

No lado do cliente, use as opções Queue e Group para enfileirar tarefas no mesmo grupo.

// Enfileirar três tarefas no mesmo grupo.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

No lado do servidor, forneça um GroupAggregator para habilitar a agregação de tarefas. Você pode personalizar a estratégia de agregação configurando GroupGracePeriod, GroupMaxDelay e GroupMaxSize.

// Esta função é usada para agregar várias tarefas em uma.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Sua lógica para agregar as tarefas fornecidas e retornar a tarefa agregada.
    // ... Se necessário, use NewTask(typename, payload, opts...) para criar uma nova tarefa e definir opções.
    // ... (Nota) a opção Queue será ignorada, e a tarefa agregada sempre será enfileirada na mesma fila que o grupo.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:   	 20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Tutorial

Nesta seção, fornecemos um programa simples para demonstrar o uso do recurso de agregação.

Primeiro, crie um programa cliente com o seguinte código:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Endereço do servidor Redis")
       flagMessage = flag.String("message", "hello", "Mensagem a ser exibida ao processar a tarefa")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Falha ao enfileirar a tarefa: %v", err)
        }
        log.Printf("Tarefa enfileirada com sucesso: %s", info.ID)
}

Você pode executar este programa várias vezes:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Agora, se você verificar a fila por meio do CLI ou da interface da Web, verá tarefas sendo agregadas na fila.

A seguir, crie um programa servidor com o seguinte código:

// server.go
pacote principal

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Endereço do servidor Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Período de tolerância para grupos")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Atraso máximo para grupos")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Tamanho máximo para grupos")
)

// Função de agregação simples.
// Combina mensagens de todas as tarefas em uma só, com cada mensagem ocupando uma linha.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Agregadas %d tarefas do grupo %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("tarefa-agregada", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Handler recebeu a tarefa agregada")
	log.Printf("Mensagem agregada: %s", task.Payload())
	return nil
}

func principal() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("tarefa-agregada", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Falha ao iniciar o servidor: %v", err)
	}
}

Você pode executar este programa e observar a saída:

$ go build -o server server.go
$ ./server --redis-addr=

Você deverá ver na saída que o servidor agregou tarefas no grupo e o processador processou as tarefas agregadas. Sinta-se à vontade para tentar alterar as flags --grace-period, --max-delay e --max-size no programa acima para ver como elas afetam a estratégia de agregação.