Bu sayfa, Asynq'nun görev birleştirme özelliğini tanıtıyor.

Genel Bakış

Görev birleştirme, görevleri tek tek "İşleyiciye" geçirmek yerine ardışık olarak birden fazla görevi sıraya almanıza olanak tanır. Bu özellik, birden fazla ardışık işlemi bir araya getirerek maliyet tasarrufu sağlamanızı, önbelleği optimize etmenizi veya bildirimleri toplu olarak göndermenizi sağlar.

Çalışma Prensibi

Görev birleştirme özelliğini kullanabilmek için aynı grup adı ile görevleri aynı sıraya almanız gerekir. Aynı (sıra, grup) çiftini kullanarak sıraya alınan görevler, sağladığınız GroupAggregator tarafından bir görev haline getirilir ve birleştirilen görev işleyiciye iletilir.

Birleştirilmiş bir görev oluşturulduğunda, Asynq sunucusu, yapılandırılabilir bekleme süresi dolana kadar daha fazla görev bekler. Her yeni görev aynı (sıra, grup) ile sıraya alındığında, bekleme süresi güncellenir.

Bekleme süresinin yapılandırılabilir bir üst sınırı vardır: maksimum birleştirme gecikme süresini ayarlayabilirsiniz, bu süreden sonra Asynq sunucusu kalan bekleme süresini yok sayacak ve görevleri birleştirecektir.

Ayrıca bir araya getirilebilecek maksimum görev sayısını da belirleyebilirsiniz. Bu sayıya ulaşıldığında, Asynq sunucusu görevleri hemen birleştirir.

Not: Görev zamanlaması ve birleştirme çakışan özelliklerdir, zamanlama birleştirmenin önünde bulunur.

Hızlı Örnek

Müşteri tarafında, aynı gruba görevleri sıraya almak için Queue ve Group seçeneklerini kullanın.

// Üç görevi aynı gruba sıraya alın.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Sunucu tarafında, görev birleştirmeyi etkinleştirmek için bir GroupAggregator sağlayın. GroupGracePeriod, GroupMaxDelay ve GroupMaxSize'ı yapılandırarak birleştirme stratejisini özelleştirebilirsiniz.

// Bu fonksiyon, birden çok görevi bir göreve birleştirmek için kullanılır.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Verilen görevleri birleştirmek için mantığınızı kullanın ve birleştirilmiş görevi döndürün.
    // ... Gerekirse, YeniGörev(tipadi, payload, opts...) kullanarak yeni bir görev oluşturun ve seçenekleri ayarlayın.
    // ... (Not) Queue seçeneği yoksayılacaktır ve birleştirilmiş görev her zaman gruba aynı kuyruğa sıraya alınacaktır.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Öğretici

Bu bölümde, birleştirme özelliğinin kullanımını gösteren basit bir program sağlıyoruz.

İlk olarak, aşağıdaki kodla bir müşteri programı oluşturun:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis sunucu adresi")
       flagMessage = flag.String("message", "hello", "Görev işlenirken yazdırılacak mesaj")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("birleştirme-öğretici", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("öğretici"), asynq.Group("örnek-grup"))
        if err != nil {
                log.Fatalf("Görevi sıraya alınamadı: %v", err)
        }
        log.Printf("Görev başarıyla sıraya alındı: %s", info.ID)
}

Bu programı birden çok kez çalıştırabilirsiniz:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Şimdi, CLI veya Web UI aracılığıyla kuyruğu kontrol ederseniz, kuyrukta birleştirilen görevleri göreceksiniz.

Devamında, aşağıdaki kodla bir sunucu programı oluşturun:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Redis sunucusunun adresi")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Gruplar için grace dönemi")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Gruplar için maksimum gecikme")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Gruplar için maksimum boyut")
)

// Basit bir birleştirme işlevi.
// Tüm görevlerin mesajlarını, her bir mesajın bir satır kapladığı şekilde tek bir mesajda birleştirir.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("%q grubundan %d görev birleştirildi", group, len(tasks))
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("birleştirilmis-gorev", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Handler birleştirilmiş görevi aldı")
	log.Printf("Birleştirilmiş mesaj: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"öğretici": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("birleştirilmis-gorev", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Sunucuyu başlatma başarısız oldu: %v", err)
	}
}

Bu programı çalıştırabilir ve çıktıyı gözlemleyebilirsiniz:

$ go build -o server server.go
$ ./server --redis-addr=

Çıktıda, sunucunun gruplarda görevleri birleştirdiğini ve işleyicinin birleştirilmiş görevleri işlediğini görmelisiniz. Yukarıdaki programdaki --grace-period, --max-delay ve --max-size bayraklarını değiştirerek birleştirme stratejisini nasıl etkilediklerini görmek için denemeler yapmaktan çekinmeyin.