Başlangıç Kılavuzu

Bu rehberde, client ve workers adlı iki program yazacağız.

  • client.go, arka planda çalışan işçi iş parçacıkları tarafından aynı anda işlenen görevleri oluşturacak ve planlayacak.
  • workers.go, istemci tarafından oluşturulan görevleri ele almak için birden fazla eşzamanlı işçi iş parçacığını başlatacak.

Bu kılavuz, localhost:6379 üzerinde bir Redis sunucusunun çalıştırıldığını varsayar. Başlamadan önce lütfen Redis'in kurulu ve çalışır durumda olduğundan emin olun.

İlk olarak, iki ana dosyamızı oluşturalım.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Ardından, asynq paketini yükleyin.

go get -u github.com/hibiken/asynq

Kod yazmaya başlamadan önce, bu iki programda kullanılacak bazı temel tipleri inceleyelim.

Redis Bağlantı Seçenekleri

Asynq, bir mesaj aracı olarak Redis'i kullanır. client.go ve workers.go, okuma ve yazma işlemleri için Redis ile bağlantı kurmalıdır. Yerel olarak çalışan Redis sunucusu için bağlantıyı belirtmek için RedisClientOpt kullanacağız.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Gerekli değilse şifre atlanabilir
    Password: "mypassword",
    // Asynq için ayrılmış bir veritabanı numarası kullanın.
    // Varsayılan olarak, Redis 16 veritabanı sağlar (0 ile 15 arası).
    DB: 0,
}

Görevler

asynqda, iş birimleri Task adlı bir tipte kapsüllenmiştir ve kavramsal olarak Type ve Payload olmak üzere iki alanı vardır.

// Type, görevin türünü belirten bir dize değeridir.
func (t *Task) Type() string

// Payload, görevin yürütülmesi için gerekli veridir.
func (t *Task) Payload() []byte

Temel tipleri incelediğimize göre, programlarımızı yazmaya başlayalım.

İstemci Programı

client.go dosyasında, asynq.Client kullanarak birkaç görev oluşturacağız ve bunları kuyruğa alacağız.

Bir görev oluşturmak için NewTask fonksiyonunu kullanabilir ve görevin türünü ve içeriğini iletebilirsiniz.

Enqueue yöntemi, bir görevi ve isteğe bağlı olarak bir dizi seçeneği alır. Görevleri gelecekteki işlemler için planlamak için ProcessIn veya ProcessAt seçeneklerini kullanın.

// E-posta görevleriyle ilgili payload.
type EmailTaskPayload struct {
    // E-posta alıcısının ID'si.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Tür adı ve payload ile bir görev oluşturun.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Görevleri hemen işleyin.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Görev başarıyla kuyruğa alındı: %+v", info)

    // Görevleri 24 saat sonra işleyin.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Görev başarıyla kuyruğa alındı: %+v", info)
}

İşte istemci programımız için gerekenler.

Çalışanlar Programı

workers.go dosyasında, çalışanları başlatmak için bir asynq.Server örneği oluşturacağız.

NewServer fonksiyonu, RedisConnOpt ve Config parametrelerini alır.

Config, sunucunun görev işleme davranışını ayarlamak için kullanılır. Tüm kullanılabilir yapılandırma seçeneklerini öğrenmek için Config belgelerine başvurabilirsiniz.

Basitlik açısından, bu örnekte yalnızca paralellik derecesini belirtiyoruz.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Not: Aşağıdaki bölümde, 'handler'ın ne olduğunu tanıtacağız.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run metodunun parametresi olan arayüz asynq.Handler, ProcessTask methoduna sahiptir.

type Handler interface {
    // Görev başarıyla işlenirse, ProcessTask nil dönmelidir.
    // ProcessTask nil olmayan bir hata döndürürse veya bir panik oluşturursa, görev daha sonra yeniden denenir.
    ProcessTask(context.Context, *Task) error
}

Handler'ı uygulamanın en basit yolu, aynı imzaya sahip bir fonksiyon tanımlamak ve onu Run'a iletilirken asynq.HandlerFunc adaptör türünü kullanmaktır.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor %d", p.UserID)

    default:
        return fmt.Errorf("Beklenmeyen görev türü: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // işlevi işlemek için asynq.HandlerFunc adaptörünü kullanın
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Bu handler fonksiyonu için switch case'lerini eklemeye devam edebiliriz, ancak gerçek bir uygulamada, her durum için mantığı ayrı bir işlevde tanımlamak daha uygun olacaktır.

Kodumuzu yeniden yapılandırmak için, handler'ımızı oluşturmak için ServeMux'u kullanalım. "net/http" paketinden ServeMux gibi, Handle veya HandleFunc çağrılarıyla bir işleyiciyi kaydedebilirsiniz. ServeMux, Handler arayüzünü karşıladığından, (*Server).Run'a iletilir.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor %d", p.UserID)
    return nil
}

Şimdi, her bir görev türü için işleme işlevlerini çıkardığımıza göre, kod daha düzenli görünüyor. Bununla birlikte, kod hala biraz belirsiz. Görev türleri ve payload türleri için bu dize değerlerine sahibiz ve bunları organik bir pakette kapsamlandırmalıyız. Kodumuzu yeniden yapılandıralım ve görev oluşturmayı ve işlemeyi kapsayan bir paket yazalım. Basitçe, task adında bir paket oluşturuyoruz.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// List of task types.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
    // ID of the email recipient.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Kullanıcıya hoş geldiniz e-postası gönderiliyor: %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Hatırlatma e-postası kullanıcıya gönderiliyor: %d", p.UserID)
    return nil
}

Şimdi client.go ve workers.go dosyalarında bu paketi içe aktarabiliriz.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Görevi hemen kuyruğa ekleyin.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Görev başarıyla kuyruğa eklendi: %+v", info)

    // Görevi 24 saat sonra işlenmek üzere kuyruğa ekleyin.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Görev başarıyla kuyruğa eklendi: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

Kod şimdi daha iyi görünüyor!

Artık client ve workers hazır olduğuna göre, bu iki programı çalıştırabiliriz. İlk olarak, görevleri oluşturup planlamak için client programını çalıştırarak başlayalım.

go run client/client.go

Bu, hemen işlenmesi için bir görev ve 24 saat sonra işlenmek üzere başka bir görev oluşturacaktır.

Görevleri incelemek için asynq komut satırı arayüzünü kullanalım.

asynq dash

Enqueued durumunda bir görev ve Scheduled durumunda başka bir görev görmelisiniz.

Not: Her durumun anlamını anlamak için lütfen Görev Yaşam Döngüsü sayfasına başvurunuz.

Son olarak, görevleri işlemek üzere workers programını başlatalım.

go run workers/workers.go

Not: Bu program, onu sonlandırana kadar çıkmayacaktır. Arkaplan işçilerini güvenli bir şekilde sonlandırmak için en iyi uygulamalar için lütfen Sinyal Wiki sayfasına başvurunuz.

Başarılı görev işleme işlemini belirten bir metin çıkışı terminalde görmelisiniz.

client programını yeniden çalıştırarak işçilerin nasıl kabul edip işlediğini görebilirsiniz.

Bir görevin ilk denemede başarılı bir şekilde işlenemediği durumlar sık görülebilir. Varsayılan olarak, başarısız görevler üstel geriye doğru 25 kez yeniden denenecektir. Başarısız bir durumu simüle etmek için işleyicimizi güncelleyelim ve bir hata döndürelim.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Kullanıcıya hoş geldin e-postası göndermeye çalışılıyor %d...", p.UserID)
    return fmt.Errorf("Kullanıcıya e-posta gönderilemedi")
}

Çalışan programımızı yeniden başlatalım ve bir görevi sıraya alalım.

go run workers/workers.go
go run client/client.go

Eğer asynq dash çalıştırıyorsanız, Retry durumunda bir görevi ("retry" sekmesini vurgulayarak) görüntüleyebilmelisiniz.

Retry durumunda olan görevleri kontrol etmek için aynı zamanda şunu da çalıştırabilirsiniz:

asynq task ls --queue=default --state=retry

Bu, gelecekte yeniden denenecek tüm görevleri listeler. Çıktı, her görevin bir sonraki çalıştırma zamanını da içerir.

Bir görevin yeniden deneme girişimlerini tüketmesi durumunda, Archived durumuna geçer ve bir daha yeniden denemez (arşivlenmiş görevleri hala CLI veya WebUI araçları kullanarak manuel olarak çalıştırabilirsiniz).

Bu kılavuzu tamamlamadan önce, işleyicimizi düzeltelim.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Kullanıcıya hoş geldin e-postası gönderiliyor %d", p.UserID)
    return nil 
}