Başlangıç Kılavuzu
Bu rehberde, client
ve workers
adlı iki program yazacağız.
-
client.go
, arka planda çalışan işçi iş parçacıkları tarafından aynı anda işlenen görevleri oluşturacak ve planlayacak. -
workers.go
, istemci tarafından oluşturulan görevleri ele almak için birden fazla eşzamanlı işçi iş parçacığını başlatacak.
Bu kılavuz, localhost:6379
üzerinde bir Redis sunucusunun çalıştırıldığını varsayar. Başlamadan önce lütfen Redis'in kurulu ve çalışır durumda olduğundan emin olun.
İlk olarak, iki ana dosyamızı oluşturalım.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Ardından, asynq
paketini yükleyin.
go get -u github.com/hibiken/asynq
Kod yazmaya başlamadan önce, bu iki programda kullanılacak bazı temel tipleri inceleyelim.
Redis Bağlantı Seçenekleri
Asynq, bir mesaj aracı olarak Redis'i kullanır. client.go
ve workers.go
, okuma ve yazma işlemleri için Redis ile bağlantı kurmalıdır. Yerel olarak çalışan Redis sunucusu için bağlantıyı belirtmek için RedisClientOpt
kullanacağız.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Gerekli değilse şifre atlanabilir
Password: "mypassword",
// Asynq için ayrılmış bir veritabanı numarası kullanın.
// Varsayılan olarak, Redis 16 veritabanı sağlar (0 ile 15 arası).
DB: 0,
}
Görevler
asynq
da, iş birimleri Task
adlı bir tipte kapsüllenmiştir ve kavramsal olarak Type
ve Payload
olmak üzere iki alanı vardır.
// Type, görevin türünü belirten bir dize değeridir.
func (t *Task) Type() string
// Payload, görevin yürütülmesi için gerekli veridir.
func (t *Task) Payload() []byte
Temel tipleri incelediğimize göre, programlarımızı yazmaya başlayalım.
İstemci Programı
client.go
dosyasında, asynq.Client
kullanarak birkaç görev oluşturacağız ve bunları kuyruğa alacağız.
Bir görev oluşturmak için NewTask
fonksiyonunu kullanabilir ve görevin türünü ve içeriğini iletebilirsiniz.
Enqueue
yöntemi, bir görevi ve isteğe bağlı olarak bir dizi seçeneği alır. Görevleri gelecekteki işlemler için planlamak için ProcessIn
veya ProcessAt
seçeneklerini kullanın.
// E-posta görevleriyle ilgili payload.
type EmailTaskPayload struct {
// E-posta alıcısının ID'si.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Tür adı ve payload ile bir görev oluşturun.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Görevleri hemen işleyin.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Görev başarıyla kuyruğa alındı: %+v", info)
// Görevleri 24 saat sonra işleyin.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Görev başarıyla kuyruğa alındı: %+v", info)
}
İşte istemci programımız için gerekenler.
Çalışanlar Programı
workers.go
dosyasında, çalışanları başlatmak için bir asynq.Server
örneği oluşturacağız.
NewServer
fonksiyonu, RedisConnOpt
ve Config
parametrelerini alır.
Config
, sunucunun görev işleme davranışını ayarlamak için kullanılır.
Tüm kullanılabilir yapılandırma seçeneklerini öğrenmek için Config
belgelerine başvurabilirsiniz.
Basitlik açısından, bu örnekte yalnızca paralellik derecesini belirtiyoruz.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Not: Aşağıdaki bölümde, 'handler'ın ne olduğunu tanıtacağız.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
metodunun parametresi olan arayüz asynq.Handler
, ProcessTask
methoduna sahiptir.
type Handler interface {
// Görev başarıyla işlenirse, ProcessTask nil dönmelidir.
// ProcessTask nil olmayan bir hata döndürürse veya bir panik oluşturursa, görev daha sonra yeniden denenir.
ProcessTask(context.Context, *Task) error
}
Handler'ı uygulamanın en basit yolu, aynı imzaya sahip bir fonksiyon tanımlamak ve onu Run
'a iletilirken asynq.HandlerFunc
adaptör türünü kullanmaktır.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor %d", p.UserID)
default:
return fmt.Errorf("Beklenmeyen görev türü: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// işlevi işlemek için asynq.HandlerFunc adaptörünü kullanın
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Bu handler fonksiyonu için switch case'lerini eklemeye devam edebiliriz, ancak gerçek bir uygulamada, her durum için mantığı ayrı bir işlevde tanımlamak daha uygun olacaktır.
Kodumuzu yeniden yapılandırmak için, handler'ımızı oluşturmak için ServeMux
'u kullanalım. "net/http"
paketinden ServeMux
gibi, Handle
veya HandleFunc
çağrılarıyla bir işleyiciyi kaydedebilirsiniz. ServeMux
, Handler
arayüzünü karşıladığından, (*Server).Run
'a iletilir.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor %d", p.UserID)
return nil
}
Şimdi, her bir görev türü için işleme işlevlerini çıkardığımıza göre, kod daha düzenli görünüyor. Bununla birlikte, kod hala biraz belirsiz. Görev türleri ve payload türleri için bu dize değerlerine sahibiz ve bunları organik bir pakette kapsamlandırmalıyız. Kodumuzu yeniden yapılandıralım ve görev oluşturmayı ve işlemeyi kapsayan bir paket yazalım. Basitçe, task
adında bir paket oluşturuyoruz.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// List of task types.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Payload for any task related to emails.
type EmailTaskPayload struct {
// ID of the email recipient.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor: %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor: %d", p.UserID)
return nil
}
Şimdi client.go
ve workers.go
dosyalarında bu paketi içe aktarabiliriz.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Görevi hemen kuyruğa ekleyin.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Görev başarıyla kuyruğa eklendi: %+v", info)
// Görevi 24 saat sonra işlenmek üzere kuyruğa ekleyin.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Görev başarıyla kuyruğa eklendi: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
Kod şimdi daha iyi görünüyor!
Artık client
ve workers
hazır olduğuna göre, bu iki programı çalıştırabiliriz. İlk olarak, görevleri oluşturup planlamak için client
programını çalıştırarak başlayalım.
go run client/client.go
Bu, hemen işlenmesi için bir görev ve 24 saat sonra işlenmek üzere başka bir görev oluşturacaktır.
Görevleri incelemek için asynq
komut satırı arayüzünü kullanalım.
asynq dash
Enqueued durumunda bir görev ve Scheduled durumunda başka bir görev görmelisiniz.
Not: Her durumun anlamını anlamak için lütfen Görev Yaşam Döngüsü sayfasına başvurunuz.
Son olarak, görevleri işlemek üzere workers
programını başlatalım.
go run workers/workers.go
Not: Bu program, onu sonlandırana kadar çıkmayacaktır. Arkaplan işçilerini güvenli bir şekilde sonlandırmak için en iyi uygulamalar için lütfen Sinyal Wiki sayfasına başvurunuz.
Başarılı görev işleme işlemini belirten bir metin çıkışı terminalde görmelisiniz.
client
programını yeniden çalıştırarak işçilerin nasıl kabul edip işlediğini görebilirsiniz.
Bir görevin ilk denemede başarılı bir şekilde işlenemediği durumlar sık görülebilir. Varsayılan olarak, başarısız görevler üstel geriye doğru 25 kez yeniden denenecektir. Başarısız bir durumu simüle etmek için işleyicimizi güncelleyelim ve bir hata döndürelim.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Kullanıcıya hoş geldin e-postası göndermeye çalışılıyor %d...", p.UserID)
return fmt.Errorf("Kullanıcıya e-posta gönderilemedi")
}
Çalışan programımızı yeniden başlatalım ve bir görevi sıraya alalım.
go run workers/workers.go
go run client/client.go
Eğer asynq dash
çalıştırıyorsanız, Retry durumunda bir görevi ("retry" sekmesini vurgulayarak) görüntüleyebilmelisiniz.
Retry durumunda olan görevleri kontrol etmek için aynı zamanda şunu da çalıştırabilirsiniz:
asynq task ls --queue=default --state=retry
Bu, gelecekte yeniden denenecek tüm görevleri listeler. Çıktı, her görevin bir sonraki çalıştırma zamanını da içerir.
Bir görevin yeniden deneme girişimlerini tüketmesi durumunda, Archived durumuna geçer ve bir daha yeniden denemez (arşivlenmiş görevleri hala CLI veya WebUI araçları kullanarak manuel olarak çalıştırabilirsiniz).
Bu kılavuzu tamamlamadan önce, işleyicimizi düzeltelim.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Kullanıcıya hoş geldin e-postası gönderiliyor %d", p.UserID)
return nil
}