Contoh Konfigurasi Server asynq untuk Melaksanakan Pembatasan Tingkat Pemrosesan Tugas
Halaman ini menunjukkan cara mengonfigurasi server asynq untuk melaksanakan pembatasan tingkat pemrosesan tugas.
Harap dicatat bahwa ini adalah pembatasan tingkat untuk setiap instansi server, bukan pembatasan tingkat global.
Pada contoh ini, kita akan menggunakan paket golang.org/x/time/rate
untuk menunjukkan pembatasan tingkat. Konfigurasi utama dalam pengaturan inisialisasi server Anda adalah IsFailure
dan RetryDelayFunc
. Kita akan membuat tipe kesalahan kustom dan mengekspresikan tipe kesalahan yang diberikan dalam fungsi IsFailure
dan RetryDelayFunc
.
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"golang.org/x/time/rate"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{
Concurrency: 10,
// Jangan menghitung suatu kesalahan sebagai kegagalan jika itu disebabkan oleh pembatasan tingkat.
IsFailure: func(err error) bool { return !IsRateLimitError(err) },
RetryDelayFunc: retryDelay,
},
)
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
type RateLimitError struct {
RetryIn time.Duration
}
func (e *RateLimitError) Error() string {
return fmt.Sprintf("Batasan tingkat tercapai (coba lagi dalam %v)", e.RetryIn)
}
func IsRateLimitError(err error) bool {
_, ok := err.(*RateLimitError)
return ok
}
func retryDelay(n int, err error, task *asynq.Task) time.Duration {
var ratelimitErr *RateLimitError
if errors.As(err, &ratelimitErr) {
return ratelimitErr.RetryIn
}
return asynq.DefaultRetryDelayFunc(n, err, task)
}
// Pembatasan tingkat 10 acara per detik, memungkinkan lonjakan hingga 30 acara.
var limiter = rate.NewLimiter(10, 30)
func handler(ctx context.Context, task *asynq.Task) error {
if !limiter.Allow() {
return &RateLimitError{
RetryIn: time.Duration(rand.Intn(10)) * time.Second,
}
}
log.Printf("[*] Menangani tugas %s", task.Payload())
return nil
}