Panduan Memulai
Dalam tutorial ini, kita akan menulis dua program, client
dan workers
.
-
client.go
akan membuat dan menjadwalkan tugas-tugas untuk diproses secara asinkron oleh thread pekerja latar belakang. -
workers.go
akan memulai beberapa thread pekerja bersamaan untuk menangani tugas-tugas yang dibuat oleh klien.
Panduan ini mengasumsikan bahwa Anda menjalankan server Redis pada localhost:6379
. Sebelum memulai, pastikan Redis terpasang dan berjalan.
Mari kita pertama-tama membuat dua file utama kita.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Kemudian, pasang paket asynq
.
go get -u github.com/hibiken/asynq
Sebelum kita mulai menulis kode, mari kita tinjau beberapa tipe inti yang akan digunakan dalam kedua program ini.
Opsi Koneksi Redis
Asynq menggunakan Redis sebagai piala pesan. Baik client.go
maupun workers.go
perlu terhubung ke Redis untuk operasi baca dan tulis. Kita akan menggunakan RedisClientOpt
untuk menentukan koneksi ke server Redis yang berjalan secara lokal.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Kata sandi dapat dihilangkan jika tidak diperlukan
Password: "katasandisaya",
// Gunakan nomor database yang didedikasikan untuk asynq.
// Secara default, Redis menyediakan 16 basis data (0 hingga 15).
DB: 0,
}
Tugas
Dalam asynq
, unit kerja dienkapsulasi dalam tipe yang disebut Task
, yang secara konseptual memiliki dua bidang: Type
dan Payload
.
// Type adalah nilai string yang menunjukkan jenis tugas.
func (t *Task) Type() string
// Payload adalah data yang diperlukan untuk eksekusi tugas.
func (t *Task) Payload() []byte
Sekarang setelah kita melihat tipe inti, mari kita mulai menulis program-program kita.
Program Klien
Di client.go
, kita akan membuat beberapa tugas dan mendaftarkannya menggunakan asynq.Client
.
Untuk membuat tugas, Anda dapat menggunakan fungsi NewTask
dan meneruskan jenis dan payload tugas.
Metode Enqueue
mengambil tugas dan sejumlah opsi. Gunakan opsi ProcessIn
atau ProcessAt
untuk menjadwalkan tugas-tugas untuk diproses di masa mendatang.
// Payload terkait dengan tugas-tugas email.
type EmailTaskPayload struct {
// ID penerima email.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Buat tugas dengan nama jenis dan payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Proses tugas-tugas dengan segera.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Berhasil mendaftarkan tugas: %+v", info)
// Proses tugas-tugas setelah 24 jam.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Berhasil mendaftarkan tugas: %+v", info)
}
Itu saja yang kita butuhkan untuk program klien kita.
Program Pekerja
Pada workers.go
, kita akan membuat instance asynq.Server
untuk memulai pekerja.
Fungsi NewServer
mengambil RedisConnOpt
dan Config
sebagai parameter.
Config
digunakan untuk menyesuaikan perilaku pemrosesan tugas server. Anda dapat merujuk ke dokumentasi Config
untuk mempelajari semua opsi konfigurasi yang tersedia.
Untuk tujuan kesimpelan, dalam contoh ini, kita hanya menentukan konkurensi.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Catatan: Pada bagian berikut, kita akan memperkenalkan apa itu `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
Parameter metode (*Server).Run
adalah antarmuka asynq.Handler
, yang memiliki metode ProcessTask
.
type Handler interface {
// Jika tugas diproses dengan sukses, ProcessTask harus mengembalikan nil.
// Jika ProcessTask mengembalikan error yang tidak-nil atau menyebabkan panic, tugas akan diulang nanti.
ProcessTask(context.Context, *Task) error
}
Cara termudah untuk mengimplementasikan penangan adalah dengan mendefinisikan fungsi dengan tanda tangan yang sama dan menggunakan tipe adapter asynq.HandlerFunc
saat meneruskannya ke Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)
default:
return fmt.Errorf("Jenis tugas yang tidak terduga: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Gunakan adapter asynq.HandlerFunc untuk menangani fungsi
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Kita dapat melanjutkan menambahkan kasus beralih untuk fungsi penangan ini, tetapi dalam aplikasi yang sebenarnya, akan lebih nyaman untuk mendefinisikan logika untuk setiap kasus dalam fungsi terpisah.
Untuk memperbaiki kode kita, mari gunakan ServeMux
untuk membuat penangan kita. Sama seperti ServeMux
dari paket "net/http"
, Anda dapat mendaftarkan penangan dengan memanggil Handle
atau HandleFunc
. ServeMux
memenuhi antarmuka Handler
, sehingga dapat diteruskan ke (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", kirimEmailSelamatDatang)
mux.HandleFunc("email:reminder", kirimEmailPengingat)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func kirimEmailSelamatDatang(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
return nil
}
func kirimEmailPengingat(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)
return nil
}
Sekarang, setelah kita mengekstrak fungsi penangan untuk setiap jenis tugas, kode terlihat lebih terorganisir. Namun, kode masih terlalu implisit. Kita memiliki nilai string untuk jenis tugas dan jenis muatan, dan kita seharusnya menyematkannya dalam paket organik. Mari perbaiki kode kita dan tulis paket untuk menyematkan pembuatan tugas dan penangan. Kita cukup membuat paket yang disebut task
.
mkdir task && touch task/task.go
package tugas
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// Daftar jenis tugas.
const (
TypeWelcomeEmail = "email:selamatdatang"
TypeReminderEmail = "email:pengingat"
)
// Payload untuk tugas terkait email.
type EmailTaskPayload struct {
// ID penerima email.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Mengirim email pengingat ke pengguna %d", p.UserID)
return nil
}
Sekarang kita dapat mengimpor paket ini di client.go
dan workers.go
.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := tugas.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := tugas.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Enqueue tugas secara langsung.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tugas berhasil dienqueue: %+v", info)
// Enqueue tugas untuk diproses setelah 24 jam.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tugas berhasil dienqueue: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(tugas.TypeWelcomeEmail, tugas.HandleWelcomeEmailTask)
mux.HandleFunc(tugas.TypeReminderEmail, tugas.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
Kode terlihat lebih baik sekarang!
Sekarang kita sudah memiliki client
dan workers
yang siap, kita bisa menjalankan kedua program ini. Mari mulai dengan menjalankan program client
untuk membuat dan menjadwalkan tugas-tugas.
go run client/client.go
Ini akan membuat dua tugas: satu untuk diproses secara langsung dan satu lagi untuk diproses setelah 24 jam.
Mari gunakan antarmuka baris perintah asynq
untuk memeriksa tugas-tugas.
asynq dash
Anda seharusnya dapat melihat satu tugas dalam status Enqueued dan tugas lain dalam status Scheduled.
Catatan: Untuk memahami arti setiap status, silakan merujuk ke Siklus Hidup Tugas.
Terakhir, mari jalankan program workers
untuk menangani tugas-tugas.
go run workers/workers.go
Catatan: Program ini tidak akan berhenti sampai Anda mengirim sinyal untuk menghentikannya. Untuk praktik terbaik tentang cara menghentikan secara aman worker di latar belakang, silakan merujuk ke Halaman Wiki Sinyal.
Anda seharusnya dapat melihat beberapa hasil keluaran teks di terminal, yang menunjukkan pemrosesan tugas-tugas dengan sukses.
Anda juga dapat menjalankan program client
lagi untuk melihat bagaimana worker menerima dan memproses tugas-tugas.
Tidak jarang bagi sebuah tugas untuk gagal diproses dengan sukses pada percobaan pertama. Secara default, tugas yang gagal akan dicoba ulang sebanyak 25 kali dengan pengurangan eksponensial. Mari perbarui handler kita untuk mengembalikan sebuah error untuk mensimulasikan situasi yang tidak berhasil.
// tugas.go```
```go
fungsi HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) kesalahan {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
kembali err
}
log.Printf(" [*] Mencoba mengirimkan email selamat datang ke pengguna %d...", p.UserID)
return fmt.Errorf("Gagal mengirim email ke pengguna")
}
Mari restart program pekerja kita dan masukkan sebuah tugas.
go run pekerja/pekerja.go
go run klien/klien.go
Jika Anda menjalankan asynq dash
, Anda seharusnya dapat melihat sebuah tugas dalam status Retry (dengan menavigasi ke tampilan detail antrian dan menyoroti tab "retry").
Untuk memeriksa tugas-tugas yang berada dalam status retry, Anda juga dapat menjalankan:
asynq tugas ls --antrian=default --status=retry
Ini akan menampilkan semua tugas yang akan diulang di masa depan. Outputnya termasuk waktu yang diharapkan untuk eksekusi berikutnya untuk setiap tugas.
Setelah sebuah tugas habis mencoba ulang, tugas tersebut akan beralih ke status Archived dan tidak akan diulang lagi (Anda masih bisa secara manual menjalankan tugas-tugas yang diarsipkan menggunakan perangkat CLI atau WebUI).
Sebelum menyimpulkan tutorial ini, mari perbaiki handler kita.
fungsi HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) kesalahan {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
kembali err
}
log.Printf(" [*] Mengirim email selamat datang ke pengguna %d", p.UserID)
return nil
}