이 페이지는 Asynq의 작업 집계 기능을 소개합니다.
개요
작업 집계를 통해 여러 작업을 연속적으로 대기열에 넣어 "핸들러"에 하나씩 전달하는 대신에 대기열에 한꺼번에 여러 작업을 넣을 수 있습니다. 이 기능을 사용하면 여러 연속 작업을 하나로 묶어 비용을 절약하고 캐싱을 최적화하거나 일괄 통지를 할 수 있습니다.
작동 원리
작업 집계 기능을 사용하려면 동일한 그룹 이름을 가진 작업을 동일한 대기열에 넣어야 합니다. 동일한 (대기열, 그룹)
쌍으로 대기열에 넣은 작업은 제공된 **GroupAggregator
**에 의해 하나의 작업으로 집계되고 집계된 작업은 핸들러에 전달됩니다.
집계된 작업을 생성할 때 Asynq 서버는 구성 가능한 유예 기간이 만료될 때까지 더 많은 작업을 기다립니다. 동일한 (대기열, 그룹)
으로 새 작업을 대기열에 넣을 때마다 유예 기간이 갱신됩니다.
유예 기간에는 구성 가능한 상한선이 있어 최대 집계 지연 시간을 설정할 수 있습니다. 이 시간이 지나면 Asynq 서버는 나머지 유예 기간을 무시하고 작업을 집계합니다.
또한 함께 집계될 수 있는 작업의 최대 수를 설정할 수 있습니다. 이 수에 도달하면 Asynq 서버는 즉시 작업을 집계합니다.
참고: 작업 일정 및 집계는 충돌하는 기능으로, 일정이 집계보다 우선합니다.
빠른 예제
클라이언트 측에서는 Queue
및 Group
옵션을 사용하여 동일한 그룹에 작업을 인큐합니다.
// 세 가지 작업을 동일한 그룹에 인큐합니다.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
서버 측에서는 GroupAggregator
를 제공하여 작업 집계를 활성화합니다. GroupGracePeriod
, GroupMaxDelay
, 및 GroupMaxSize
를 구성하여 집계 전략을 사용자 정의할 수 있습니다.
// 이 함수는 여러 작업을 하나의 작업으로 집계하는 데 사용됩니다.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... 주어진 작업을 집계하고 집계된 작업을 반환하는 논리 코드
// ... 필요한 경우, NewTask(typename, payload, opts...)를 사용하여 새 작업을 만들고 옵션을 설정합니다.
// ... (참고) Queue 옵션은 무시되며, 집계된 작업은 항상 그룹과 동일한 큐에 인큐됩니다.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
튜토리얼
이 섹션에서는 집계 기능의 사용을 보여주는 간단한 프로그램을 제공합니다.
먼저, 다음의 코드로 클라이언트 프로그램을 생성합니다:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
flagMessage = flag.String("message", "hello", "처리되는 작업에 출력될 메시지")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("작업을 인큐하는 데 실패했습니다: %v", err)
}
log.Printf("작업을 성공적으로 인큐했습니다: %s", info.ID)
}
이 프로그램을 여러 번 실행할 수 있습니다:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
이제 CLI 또는 Web UI를 통해 큐를 확인하면 대기열에서 작업이 집계되는 것을 확인할 수 있습니다.
다음으로, 다음과 같은 코드로 서버 프로그램을 생성합니다:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis 서버 주소")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "그룹용 우아한 기간")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "그룹용 최대 지연 시간")
flagGroupMaxSize = flag.Int("max-size", 20, "그룹용 최대 크기")
)
// 간단한 집계 함수.
// 각 작업의 메시지를 한 줄씩 가져와 하나로 합침
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("%q 그룹에서 %d 작업을 집계함", group, len(tasks))
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("핸들러가 집계된 작업을 수신했습니다")
log.Printf("집계된 메시지: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
그룹집계기: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("aggregated-task", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("서버 시작에 실패했습니다: %v", err)
}
}
위의 프로그램을 실행하고 출력을 관찰할 수 있습니다.
$ go build -o server server.go
$ ./server --redis-addr=
출력에서 서버가 그룹에서 작업을 집계하고 프로세서가 집계된 작업을 처리한 것을 확인할 수 있어야 합니다. 위의 프로그램에서 --grace-period
, --max-delay
, 및 --max-size
플래그를 변경하여 집계 전략에 어떻게 영향을 미치는지 확인해 보세요.