Halaman ini memperkenalkan fitur penggabungan tugas dari Asynq.

Ikhtisar

Penggabungan tugas memungkinkan Anda untuk mengantrikan beberapa tugas secara berurutan, daripada melewatkan satu per satu ke "Handler". Fitur ini memungkinkan Anda untuk menggabungkan beberapa operasi berturut-turut menjadi satu, menghemat biaya, mengoptimalkan caching, atau menggabungkan pemberitahuan.

Prinsip Kerja

Untuk menggunakan fitur penggabungan tugas, Anda perlu mengantrikan tugas dengan nama grup yang sama ke dalam antrian yang sama. Tugas-tugas yang diantrikan menggunakan pasangan (antrian, grup) yang sama akan digabungkan menjadi satu tugas oleh GroupAggregator yang Anda sediakan, dan tugas yang telah digabungkan akan dilewatkan ke handler.

Saat membuat tugas yang telah digabungkan, server Asynq akan menunggu lebih banyak tugas hingga periode toleransi yang dapat dikonfigurasi habis. Setiap kali tugas baru diantrikan dengan pasangan (antrian, grup) yang sama, periode toleransi akan diperbarui.

Periode toleransi memiliki batas atas yang dapat dikonfigurasi: Anda dapat mengatur waktu penundaan penggabungan maksimum, setelah itu server Asynq akan mengabaikan sisa periode toleransi dan menggabungkan tugas-tugas tersebut.

Anda juga dapat mengatur jumlah maksimum tugas yang dapat digabungkan bersama. Jika jumlah ini tercapai, server Asynq akan segera menggabungkan tugas-tugas tersebut.

Catatan: Penjadwalan tugas dan penggabungan adalah fitur yang bertentangan, dengan penjadwalan lebih diutamakan daripada penggabungan.

Contoh Cepat

Pada sisi klien, gunakan opsi Queue dan Group untuk mengantre tugas ke dalam grup yang sama.

// Mengantre tiga tugas ke dalam grup yang sama.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Pada sisi server, sediakan GroupAggregator untuk mengaktifkan penggabungan tugas. Anda dapat menyesuaikan strategi penggabungan dengan mengonfigurasi GroupGracePeriod, GroupMaxDelay, dan GroupMaxSize.

// Fungsi ini digunakan untuk menggabungkan beberapa tugas menjadi satu tugas.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Logika Anda untuk menggabungkan tugas yang diberikan dan mengembalikan tugas yang telah digabungkan.
    // ... Jika perlu, gunakan NewTask(namajenis, payload, opts...) untuk membuat tugas baru dan mengatur opsi.
    // ... (Catatan) opsi Queue akan diabaikan, dan tugas yang digabungkan akan selalu diantre ke antrian yang sama dengan grupnya.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Tutorial

Pada bagian ini, kami menyediakan program sederhana untuk menunjukkan penggunaan fitur penggabungan.

Pertama, buat program klien dengan kode berikut:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Alamat server Redis")
       flagMessage = flag.String("message", "hello", "Pesan yang akan dicetak saat memproses tugas")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Gagal mengantre tugas: %v", err)
        }
        log.Printf("Berhasil mengantre tugas: %s", info.ID)
}

Anda dapat menjalankan program ini beberapa kali:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Sekarang, jika Anda memeriksa antrian melalui CLI atau antarmuka pengguna web, Anda akan melihat tugas-tugas digabungkan dalam antrian.

Selanjutnya, buat program server dengan kode berikut:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Alamat server Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Periode penundaan untuk grup")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Penundaan maksimum untuk grup")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Ukuran maksimum untuk grup")
)

// Fungsi agregasi sederhana.
// Menggabungkan pesan dari semua tugas menjadi satu, dengan setiap pesan mengambil satu baris.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Menggabungkan %d tugas dari grup %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("tugas-terakumulasi", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Penangan menerima tugas yang terakumulasi")
	log.Printf("Pesan yang terakumulasi: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("tugas-terakumulasi", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Gagal memulai server: %v", err)
	}
}

Anda dapat menjalankan program ini dan mengamati outputnya:

$ go build -o server server.go
$ ./server --redis-addr=

Anda seharusnya bisa melihat dalam output bahwa server telah mengagregasi tugas dalam grup dan prosesor telah memproses tugas yang terakumulasi. Silakan mencoba mengubah --grace-period, --max-delay, dan --max-size di program di atas untuk melihat bagaimana mereka memengaruhi strategi agregasi.