Esta página presenta la característica de agregación de tareas de Asynq.
Descripción general
La agregación de tareas le permite encolar múltiples tareas en sucesión, en lugar de pasarlas una por una a un "Manejador". Esta función le permite agrupar múltiples operaciones consecutivas en una sola, ahorrando costos, optimizando el almacenamiento en caché o realizando notificaciones por lotes.
Principio de funcionamiento
Para usar la función de agregación de tareas, debe encolar tareas con el mismo nombre de grupo en la misma cola. Las tareas encoladas usando el mismo par (cola, grupo)
serán agregadas en una sola tarea por el Agregador de Grupo
que proporcione, y la tarea agregada se pasará al manejador.
Cuando se crea una tarea agregada, el servidor Asynq esperará más tareas hasta que expire el período de gracia configurable. Cada vez que se encola una nueva tarea con el mismo (cola, grupo)
, se actualiza el período de gracia.
El período de gracia tiene un límite superior configurable: puede establecer el tiempo máximo de demora de agregación, después del cual el servidor Asynq ignorará el período de gracia restante y agregará las tareas.
También puede establecer el número máximo de tareas que pueden ser agregadas juntas. Si se alcanza este número, el servidor Asynq agregará inmediatamente las tareas.
Nota: La programación de tareas y la agregación son funciones conflictivas, con la programación teniendo prioridad sobre la agregación.
Ejemplo rápido
En el lado del cliente, use las opciones Queue
y Group
para encolar tareas en el mismo grupo.
// Encola tres tareas en el mismo grupo.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
En el lado del servidor, proporcione un GroupAggregator
para habilitar la agregación de tareas. Puede personalizar la estrategia de agregación configurando GroupGracePeriod
, GroupMaxDelay
y GroupMaxSize
.
// Esta función se utiliza para agregar múltiples tareas en una sola tarea.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Su lógica para agregar las tareas dadas y devolver la tarea agregada.
// ... Si es necesario, use NewTask(typename, payload, opts...) para crear una nueva tarea y establecer opciones.
// ... (Nota) la opción de Queue será ignorada, y la tarea agregada siempre se encolará en la misma cola que el grupo.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Tutorial
En esta sección, proporcionamos un programa simple para demostrar el uso de la función de agregación.
Primero, cree un programa cliente con el siguiente código:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Dirección del servidor Redis")
flagMessage = flag.String("message", "hello", "Mensaje a imprimir al procesar la tarea")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("Error al encolar la tarea: %v", err)
}
log.Printf("Tarea encolada correctamente: %s", info.ID)
}
Puede ejecutar este programa varias veces:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
Ahora, si revisa la cola a través de la CLI o la interfaz web, verá que las tareas se están agregando en la cola.
A continuación, cree un programa servidor con el siguiente código:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Dirección del servidor de Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Período de gracia para grupos")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Retardo máximo para grupos")
flagGroupMaxSize = flag.Int("max-size", 20, "Tamaño máximo para grupos")
)
// Función de agregación simple.
// Combina los mensajes de todas las tareas en uno, con cada mensaje ocupando una línea.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Tareas agregadas %d del grupo %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("tarea-agregada", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("El manejador recibió la tarea agregada")
log.Printf("Mensaje agregado: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("tarea-agregada", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Error al iniciar el servidor: %v", err)
}
}
Puede ejecutar este programa y observar la salida:
$ go build -o server server.go
$ ./server --redis-addr=
Debería poder ver en la salida que el servidor ha agregado tareas en el grupo y que el procesador ha procesado las tareas agregadas. Siéntase libre de intentar cambiar las banderas --grace-period
, --max-delay
y --max-size
en el programa anterior para ver cómo afectan la estrategia de agregación.