Bu sayfa, Asynq'nun görev birleştirme özelliğini tanıtıyor.
Genel Bakış
Görev birleştirme, görevleri tek tek "İşleyiciye" geçirmek yerine ardışık olarak birden fazla görevi sıraya almanıza olanak tanır. Bu özellik, birden fazla ardışık işlemi bir araya getirerek maliyet tasarrufu sağlamanızı, önbelleği optimize etmenizi veya bildirimleri toplu olarak göndermenizi sağlar.
Çalışma Prensibi
Görev birleştirme özelliğini kullanabilmek için aynı grup adı ile görevleri aynı sıraya almanız gerekir. Aynı (sıra, grup)
çiftini kullanarak sıraya alınan görevler, sağladığınız GroupAggregator
tarafından bir görev haline getirilir ve birleştirilen görev işleyiciye iletilir.
Birleştirilmiş bir görev oluşturulduğunda, Asynq sunucusu, yapılandırılabilir bekleme süresi dolana kadar daha fazla görev bekler. Her yeni görev aynı (sıra, grup)
ile sıraya alındığında, bekleme süresi güncellenir.
Bekleme süresinin yapılandırılabilir bir üst sınırı vardır: maksimum birleştirme gecikme süresini ayarlayabilirsiniz, bu süreden sonra Asynq sunucusu kalan bekleme süresini yok sayacak ve görevleri birleştirecektir.
Ayrıca bir araya getirilebilecek maksimum görev sayısını da belirleyebilirsiniz. Bu sayıya ulaşıldığında, Asynq sunucusu görevleri hemen birleştirir.
Not: Görev zamanlaması ve birleştirme çakışan özelliklerdir, zamanlama birleştirmenin önünde bulunur.
Hızlı Örnek
Müşteri tarafında, aynı gruba görevleri sıraya almak için Queue
ve Group
seçeneklerini kullanın.
// Üç görevi aynı gruba sıraya alın.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
Sunucu tarafında, görev birleştirmeyi etkinleştirmek için bir GroupAggregator
sağlayın. GroupGracePeriod
, GroupMaxDelay
ve GroupMaxSize
'ı yapılandırarak birleştirme stratejisini özelleştirebilirsiniz.
// Bu fonksiyon, birden çok görevi bir göreve birleştirmek için kullanılır.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Verilen görevleri birleştirmek için mantığınızı kullanın ve birleştirilmiş görevi döndürün.
// ... Gerekirse, YeniGörev(tipadi, payload, opts...) kullanarak yeni bir görev oluşturun ve seçenekleri ayarlayın.
// ... (Not) Queue seçeneği yoksayılacaktır ve birleştirilmiş görev her zaman gruba aynı kuyruğa sıraya alınacaktır.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
Öğretici
Bu bölümde, birleştirme özelliğinin kullanımını gösteren basit bir program sağlıyoruz.
İlk olarak, aşağıdaki kodla bir müşteri programı oluşturun:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis sunucu adresi")
flagMessage = flag.String("message", "hello", "Görev işlenirken yazdırılacak mesaj")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("birleştirme-öğretici", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("öğretici"), asynq.Group("örnek-grup"))
if err != nil {
log.Fatalf("Görevi sıraya alınamadı: %v", err)
}
log.Printf("Görev başarıyla sıraya alındı: %s", info.ID)
}
Bu programı birden çok kez çalıştırabilirsiniz:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
Şimdi, CLI veya Web UI aracılığıyla kuyruğu kontrol ederseniz, kuyrukta birleştirilen görevleri göreceksiniz.
Devamında, aşağıdaki kodla bir sunucu programı oluşturun:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis sunucusunun adresi")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "Gruplar için grace dönemi")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "Gruplar için maksimum gecikme")
flagGroupMaxSize = flag.Int("max-size", 20, "Gruplar için maksimum boyut")
)
// Basit bir birleştirme işlevi.
// Tüm görevlerin mesajlarını, her bir mesajın bir satır kapladığı şekilde tek bir mesajda birleştirir.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("%q grubundan %d görev birleştirildi", group, len(tasks))
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("birleştirilmis-gorev", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("Handler birleştirilmiş görevi aldı")
log.Printf("Birleştirilmiş mesaj: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"öğretici": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("birleştirilmis-gorev", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("Sunucuyu başlatma başarısız oldu: %v", err)
}
}
Bu programı çalıştırabilir ve çıktıyı gözlemleyebilirsiniz:
$ go build -o server server.go
$ ./server --redis-addr=
Çıktıda, sunucunun gruplarda görevleri birleştirdiğini ve işleyicinin birleştirilmiş görevleri işlediğini görmelisiniz. Yukarıdaki programdaki --grace-period
, --max-delay
ve --max-size
bayraklarını değiştirerek birleştirme stratejisini nasıl etkilediklerini görmek için denemeler yapmaktan çekinmeyin.