Panduan Memulai

Dalam tutorial ini, kita akan menulis dua program, client dan workers.

  • client.go akan membuat dan menjadwalkan tugas-tugas untuk diproses secara asinkron oleh thread pekerja latar belakang.
  • workers.go akan memulai beberapa thread pekerja bersamaan untuk menangani tugas-tugas yang dibuat oleh klien.

Panduan ini mengasumsikan bahwa Anda menjalankan server Redis pada localhost:6379. Sebelum memulai, pastikan Redis terpasang dan berjalan.

Mari kita pertama-tama membuat dua file utama kita.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Kemudian, pasang paket asynq.

go get -u github.com/hibiken/asynq

Sebelum kita mulai menulis kode, mari kita tinjau beberapa tipe inti yang akan digunakan dalam kedua program ini.

Opsi Koneksi Redis

Asynq menggunakan Redis sebagai piala pesan. Baik client.go maupun workers.go perlu terhubung ke Redis untuk operasi baca dan tulis. Kita akan menggunakan RedisClientOpt untuk menentukan koneksi ke server Redis yang berjalan secara lokal.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Kata sandi dapat dihilangkan jika tidak diperlukan
    Password: "katasandisaya",
    // Gunakan nomor database yang didedikasikan untuk asynq.
    // Secara default, Redis menyediakan 16 basis data (0 hingga 15).
    DB: 0,
}

Tugas

Dalam asynq, unit kerja dienkapsulasi dalam tipe yang disebut Task, yang secara konseptual memiliki dua bidang: Type dan Payload.

// Type adalah nilai string yang menunjukkan jenis tugas.
func (t *Task) Type() string

// Payload adalah data yang diperlukan untuk eksekusi tugas.
func (t *Task) Payload() []byte

Sekarang setelah kita melihat tipe inti, mari kita mulai menulis program-program kita.

Program Klien

Di client.go, kita akan membuat beberapa tugas dan mendaftarkannya menggunakan asynq.Client.

Untuk membuat tugas, Anda dapat menggunakan fungsi NewTask dan meneruskan jenis dan payload tugas.

Metode Enqueue mengambil tugas dan sejumlah opsi. Gunakan opsi ProcessIn atau ProcessAt untuk menjadwalkan tugas-tugas untuk diproses di masa mendatang.

// Payload terkait dengan tugas-tugas email.
type EmailTaskPayload struct {
    // ID penerima email.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Buat tugas dengan nama jenis dan payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Proses tugas-tugas dengan segera.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Berhasil mendaftarkan tugas: %+v", info)

    // Proses tugas-tugas setelah 24 jam.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Berhasil mendaftarkan tugas: %+v", info)
}

Itu saja yang kita butuhkan untuk program klien kita.

Program Pekerja

Pada workers.go, kita akan membuat instance asynq.Server untuk memulai pekerja.

Fungsi NewServer mengambil RedisConnOpt dan Config sebagai parameter.

Config digunakan untuk menyesuaikan perilaku pemrosesan tugas server. Anda dapat merujuk ke dokumentasi Config untuk mempelajari semua opsi konfigurasi yang tersedia.

Untuk tujuan kesimpelan, dalam contoh ini, kita hanya menentukan konkurensi.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Catatan: Pada bagian berikut, kita akan memperkenalkan apa itu `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Parameter metode (*Server).Run adalah antarmuka asynq.Handler, yang memiliki metode ProcessTask.

type Handler interface {
    // Jika tugas diproses dengan sukses, ProcessTask harus mengembalikan nil.
    // Jika ProcessTask mengembalikan error yang tidak-nil atau menyebabkan panic, tugas akan diulang nanti.
    ProcessTask(context.Context, *Task) error
}

Cara termudah untuk mengimplementasikan penangan adalah dengan mendefinisikan fungsi dengan tanda tangan yang sama dan menggunakan tipe adapter asynq.HandlerFunc saat meneruskannya ke Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)

    default:
        return fmt.Errorf("Jenis tugas yang tidak terduga: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Gunakan adapter asynq.HandlerFunc untuk menangani fungsi
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Kita dapat melanjutkan menambahkan kasus beralih untuk fungsi penangan ini, tetapi dalam aplikasi yang sebenarnya, akan lebih nyaman untuk mendefinisikan logika untuk setiap kasus dalam fungsi terpisah.

Untuk memperbaiki kode kita, mari gunakan ServeMux untuk membuat penangan kita. Sama seperti ServeMux dari paket "net/http", Anda dapat mendaftarkan penangan dengan memanggil Handle atau HandleFunc. ServeMux memenuhi antarmuka Handler, sehingga dapat diteruskan ke (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", kirimEmailSelamatDatang)
    mux.HandleFunc("email:reminder", kirimEmailPengingat)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func kirimEmailSelamatDatang(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
    return nil
}

func kirimEmailPengingat(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)
    return nil
}

Sekarang, setelah kita mengekstrak fungsi penangan untuk setiap jenis tugas, kode terlihat lebih terorganisir. Namun, kode masih terlalu implisit. Kita memiliki nilai string untuk jenis tugas dan jenis muatan, dan kita seharusnya menyematkannya dalam paket organik. Mari perbaiki kode kita dan tulis paket untuk menyematkan pembuatan tugas dan penangan. Kita cukup membuat paket yang disebut task.

mkdir task && touch task/task.go
package tugas

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// Daftar jenis tugas.
const (
	TypeWelcomeEmail  = "email:selamatdatang"
	TypeReminderEmail = "email:pengingat"
)

// Payload untuk tugas terkait email.
type EmailTaskPayload struct {
	// ID penerima email.
	UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)
	return nil
}

Sekarang kita dapat mengimpor paket ini di client.go dan workers.go.

```go
// client.go
func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := tugas.NewWelcomeEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := tugas.NewReminderEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	// Enqueue tugas secara langsung.
	info, err := client.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tugas berhasil dienqueue: %+v", info)

	// Enqueue tugas untuk diproses setelah 24 jam.
	info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tugas berhasil dienqueue: %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(tugas.TypeWelcomeEmail, tugas.HandleWelcomeEmailTask)
	mux.HandleFunc(tugas.TypeReminderEmail, tugas.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

Kode terlihat lebih baik sekarang!

Sekarang kita sudah memiliki client dan workers yang siap, kita bisa menjalankan kedua program ini. Mari mulai dengan menjalankan program client untuk membuat dan menjadwalkan tugas-tugas.

go run client/client.go

Ini akan membuat dua tugas: satu untuk diproses secara langsung dan satu lagi untuk diproses setelah 24 jam.

Mari gunakan antarmuka baris perintah asynq untuk memeriksa tugas-tugas.

asynq dash

Anda seharusnya dapat melihat satu tugas dalam status Enqueued dan tugas lain dalam status Scheduled.

Catatan: Untuk memahami arti setiap status, silakan merujuk ke Siklus Hidup Tugas.

Terakhir, mari jalankan program workers untuk menangani tugas-tugas.

go run workers/workers.go

Catatan: Program ini tidak akan berhenti sampai Anda mengirim sinyal untuk menghentikannya. Untuk praktik terbaik tentang cara menghentikan secara aman worker di latar belakang, silakan merujuk ke Halaman Wiki Sinyal.

Anda seharusnya dapat melihat beberapa hasil keluaran teks di terminal, yang menunjukkan pemrosesan tugas-tugas dengan sukses.

Anda juga dapat menjalankan program client lagi untuk melihat bagaimana worker menerima dan memproses tugas-tugas.

Tidak jarang bagi sebuah tugas untuk gagal diproses dengan sukses pada percobaan pertama. Secara default, tugas yang gagal akan dicoba ulang sebanyak 25 kali dengan pengurangan eksponensial. Mari perbarui handler kita untuk mengembalikan sebuah error untuk mensimulasikan situasi yang tidak berhasil.

// tugas.go```
```go
fungsi HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) kesalahan {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        kembali err
    }
    log.Printf(" [*] Mencoba mengirimkan email selamat datang ke pengguna %d...", p.UserID)
    return fmt.Errorf("Gagal mengirim email ke pengguna")
}

Mari restart program pekerja kita dan masukkan sebuah tugas.

go run pekerja/pekerja.go
go run klien/klien.go

Jika Anda menjalankan asynq dash, Anda seharusnya dapat melihat sebuah tugas dalam status Retry (dengan menavigasi ke tampilan detail antrian dan menyoroti tab "retry").

Untuk memeriksa tugas-tugas yang berada dalam status retry, Anda juga dapat menjalankan:

asynq tugas ls --antrian=default --status=retry

Ini akan menampilkan semua tugas yang akan diulang di masa depan. Outputnya termasuk waktu yang diharapkan untuk eksekusi berikutnya untuk setiap tugas.

Setelah sebuah tugas habis mencoba ulang, tugas tersebut akan beralih ke status Archived dan tidak akan diulang lagi (Anda masih bisa secara manual menjalankan tugas-tugas yang diarsipkan menggunakan perangkat CLI atau WebUI).

Sebelum menyimpulkan tutorial ini, mari perbaiki handler kita.

fungsi HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) kesalahan {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        kembali err
    }
    log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
    return nil 
}