이 페이지는 Asynq의 작업 집계 기능을 소개합니다.

개요

작업 집계를 통해 여러 작업을 연속적으로 대기열에 넣어 "핸들러"에 하나씩 전달하는 대신에 대기열에 한꺼번에 여러 작업을 넣을 수 있습니다. 이 기능을 사용하면 여러 연속 작업을 하나로 묶어 비용을 절약하고 캐싱을 최적화하거나 일괄 통지를 할 수 있습니다.

작동 원리

작업 집계 기능을 사용하려면 동일한 그룹 이름을 가진 작업을 동일한 대기열에 넣어야 합니다. 동일한 (대기열, 그룹) 쌍으로 대기열에 넣은 작업은 제공된 **GroupAggregator**에 의해 하나의 작업으로 집계되고 집계된 작업은 핸들러에 전달됩니다.

집계된 작업을 생성할 때 Asynq 서버는 구성 가능한 유예 기간이 만료될 때까지 더 많은 작업을 기다립니다. 동일한 (대기열, 그룹)으로 새 작업을 대기열에 넣을 때마다 유예 기간이 갱신됩니다.

유예 기간에는 구성 가능한 상한선이 있어 최대 집계 지연 시간을 설정할 수 있습니다. 이 시간이 지나면 Asynq 서버는 나머지 유예 기간을 무시하고 작업을 집계합니다.

또한 함께 집계될 수 있는 작업의 최대 수를 설정할 수 있습니다. 이 수에 도달하면 Asynq 서버는 즉시 작업을 집계합니다.

참고: 작업 일정 및 집계는 충돌하는 기능으로, 일정이 집계보다 우선합니다.

빠른 예제

클라이언트 측에서는 QueueGroup 옵션을 사용하여 동일한 그룹에 작업을 인큐합니다.

// 세 가지 작업을 동일한 그룹에 인큐합니다.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

서버 측에서는 GroupAggregator를 제공하여 작업 집계를 활성화합니다. GroupGracePeriod, GroupMaxDelay, 및 GroupMaxSize를 구성하여 집계 전략을 사용자 정의할 수 있습니다.

// 이 함수는 여러 작업을 하나의 작업으로 집계하는 데 사용됩니다.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... 주어진 작업을 집계하고 집계된 작업을 반환하는 논리 코드
    // ... 필요한 경우, NewTask(typename, payload, opts...)를 사용하여 새 작업을 만들고 옵션을 설정합니다.
    // ... (참고) Queue 옵션은 무시되며, 집계된 작업은 항상 그룹과 동일한 큐에 인큐됩니다.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

튜토리얼

이 섹션에서는 집계 기능의 사용을 보여주는 간단한 프로그램을 제공합니다.

먼저, 다음의 코드로 클라이언트 프로그램을 생성합니다:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
        flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
        flagMessage = flag.String("message", "hello", "처리되는 작업에 출력될 메시지")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("작업을 인큐하는 데 실패했습니다: %v", err)
        }
        log.Printf("작업을 성공적으로 인큐했습니다: %s", info.ID)
}

이 프로그램을 여러 번 실행할 수 있습니다:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

이제 CLI 또는 Web UI를 통해 큐를 확인하면 대기열에서 작업이 집계되는 것을 확인할 수 있습니다.

다음으로, 다음과 같은 코드로 서버 프로그램을 생성합니다:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Redis 서버 주소")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "그룹용 우아한 기간")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "그룹용 최대 지연 시간")
	flagGroupMaxSize      = flag.Int("max-size", 20, "그룹용 최대 크기")
)

// 간단한 집계 함수.
// 각 작업의 메시지를 한 줄씩 가져와 하나로 합침
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("%q 그룹에서 %d 작업을 집계함", group, len(tasks))
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("핸들러가 집계된 작업을 수신했습니다")
	log.Printf("집계된 메시지: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			그룹집계기:       asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("서버 시작에 실패했습니다: %v", err)
	}
}

위의 프로그램을 실행하고 출력을 관찰할 수 있습니다.

$ go build -o server server.go
$ ./server --redis-addr=

출력에서 서버가 그룹에서 작업을 집계하고 프로세서가 집계된 작업을 처리한 것을 확인할 수 있어야 합니다. 위의 프로그램에서 --grace-period, --max-delay, 및 --max-size 플래그를 변경하여 집계 전략에 어떻게 영향을 미치는지 확인해 보세요.