Esta página apresenta a funcionalidade de agregação de tarefas do Asynq.
Visão geral
A agregação de tarefas permite enfileirar várias tarefas sucessivamente, em vez de passá-las uma por uma para um "Handler". Esta funcionalidade permite agrupar várias operações consecutivas em uma só, economizando custos, otimizando o caching ou notificações em lote.
Princípio de funcionamento
Para usar a funcionalidade de agregação de tarefas, é necessário enfileirar as tarefas com o mesmo nome do grupo na mesma fila. As tarefas enfileiradas usando o mesmo par (fila, grupo)
serão agregadas em uma única tarefa pelo GroupAggregator
que você fornecer, e a tarefa agregada será encaminhada para o manipulador.
Ao criar uma tarefa agregada, o servidor Asynq aguardará mais tarefas até que o período de tolerância configurável expire. Cada vez que uma nova tarefa é enfileirada com o mesmo (fila, grupo)
, o período de tolerância é atualizado.
O período de tolerância tem um limite superior configurável: é possível definir o tempo máximo de atraso de agregação, após o qual o servidor Asynq ignorará o restante do período de tolerância e agregará as tarefas.
Também é possível definir o número máximo de tarefas que podem ser agregadas juntas. Se esse número for atingido, o servidor Asynq agregará imediatamente as tarefas.
Nota: O agendamento e a agregação de tarefas são funcionalidades conflitantes, com o agendamento tendo precedência sobre a agregação.
Exemplo Rápido
No lado do cliente, use as opções Queue
e Group
para enfileirar tarefas no mesmo grupo.
// Enfileirar três tarefas no mesmo grupo.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
No lado do servidor, forneça um GroupAggregator
para habilitar a agregação de tarefas. Você pode personalizar a estratégia de agregação configurando GroupGracePeriod
, GroupMaxDelay
e GroupMaxSize
.
// Esta função é usada para agregar várias tarefas em uma.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Sua lógica para agregar as tarefas fornecidas e retornar a tarefa agregada.
// ... Se necessário, use NewTask(typename, payload, opts...) para criar uma nova tarefa e definir opções.
// ... (Nota) a opção Queue será ignorada, e a tarefa agregada sempre será enfileirada na mesma fila que o grupo.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Tutorial
Nesta seção, fornecemos um programa simples para demonstrar o uso do recurso de agregação.
Primeiro, crie um programa cliente com o seguinte código:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Endereço do servidor Redis")
flagMessage = flag.String("message", "hello", "Mensagem a ser exibida ao processar a tarefa")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("Falha ao enfileirar a tarefa: %v", err)
}
log.Printf("Tarefa enfileirada com sucesso: %s", info.ID)
}
Você pode executar este programa várias vezes:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
Agora, se você verificar a fila por meio do CLI ou da interface da Web, verá tarefas sendo agregadas na fila.
A seguir, crie um programa servidor com o seguinte código:
// server.go
pacote principal
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Endereço do servidor Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Período de tolerância para grupos")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Atraso máximo para grupos")
flagGroupMaxSize = flag.Int("max-size", 20, "Tamanho máximo para grupos")
)
// Função de agregação simples.
// Combina mensagens de todas as tarefas em uma só, com cada mensagem ocupando uma linha.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Agregadas %d tarefas do grupo %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("tarefa-agregada", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("Handler recebeu a tarefa agregada")
log.Printf("Mensagem agregada: %s", task.Payload())
return nil
}
func principal() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("tarefa-agregada", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Falha ao iniciar o servidor: %v", err)
}
}
Você pode executar este programa e observar a saída:
$ go build -o server server.go
$ ./server --redis-addr=
Você deverá ver na saída que o servidor agregou tarefas no grupo e o processador processou as tarefas agregadas. Sinta-se à vontade para tentar alterar as flags --grace-period
, --max-delay
e --max-size
no programa acima para ver como elas afetam a estratégia de agregação.