Trong trang này, tôi sẽ giải thích thiết kế của giao diện Handler
.
Giao diện Handler
mà bạn cung cấp cho máy chủ là trung tâm của logic xử lý nhiệm vụ không đồng bộ của bạn. Trách nhiệm của Handler là chấp nhận một nhiệm vụ và xử lý nó trong khi xem xét ngữ cảnh. Nếu quá trình xử lý thất bại, nó phải báo cáo mọi lỗi để thử lại nhiệm vụ sau này.
Giao diện được định nghĩa như sau:
type Handler interface {
ProcessTask(context.Context, *Task) error
}
Đây là một giao diện đơn giản, mô tả ngắn gọn trách nhiệm của một Handler.
Có nhiều cách để triển khai giao diện người xử lý này.
Dưới đây là một ví dụ về cách xác định kiểu đối tượng struct riêng để xử lý các nhiệm vụ:
type MyTaskHandler struct {
// ... trường dữ liệu
}
// Triển khai phương thức ProcessTask
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// ... logic xử lý công việc
}
Bạn thậm chí có thể xác định một hàm để đáp ứng giao diện, nhờ vào kiểu adapter HandlerFunc
.
func myHandler(ctx context.Context, t *asynq.Task) error {
// ... logic xử lý công việc
}
// h đáp ứng giao diện Handler
h := asynq.HandlerFunc(myHandler)
Trong hầu hết các trường hợp, bạn có thể cần kiểm tra Type
của nhiệm vụ đầu vào và xử lý nó tương ứng.
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "type1":
// xử lý type1
case "type2":
// xử lý type2
case "typeN":
// xử lý typeN
default:
return fmt.Errorf("loại nhiệm vụ không mong đợi: %q", t.Type())
}
}
Như bạn có thể thấy, một người xử lý có thể được tạo thành từ nhiều người xử lý khác nhau. Mỗi trường hợp trong ví dụ trên có thể được xử lý bởi một người xử lý riêng. Đây là nơi mà kiểu ServeMux
hoạt động.
Lưu ý: Bạn không nhất thiết phải sử dụng kiểu ServeMux
để triển khai một người xử lý, nhưng nó có thể rất hữu ích trong nhiều trường hợp.
Bằng cách sử dụng ServeMux
, bạn có thể đăng ký nhiều người xử lý. Nó sẽ khớp loại từng nhiệm vụ với các mẫu đã đăng ký và gọi người xử lý tương ứng với mẫu gần nhất với tên loại nhiệm vụ.
mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // Đăng ký người xử lý
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:", defaultEmailHandler) // Người xử lý mặc định cho loại nhiệm vụ khác bắt đầu bằng "email:"
Sử dụng Middleware
Nếu bạn cần thực hiện một số mã trước và/hoặc sau khi xử lý các yêu cầu, bạn có thể làm điều này bằng middleware. Middleware là một hàm nhận một Handler
và trả về một Handler
.
Dưới đây là một ví dụ về middleware ghi lại điểm bắt đầu và kết thúc của quá trình xử lý nhiệm vụ.
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("Bắt đầu xử lý cho %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("Hoàn thành xử lý cho %q: thời gian trôi qua = %v", t.Type(), time.Since(start))
return nil
})
}
Bây giờ bạn có thể sử dụng middleware này để “bao” người xử lý của bạn.
myHandler = loggingMiddleware(myHandler)
Ngoài ra, nếu bạn đang sử dụng ServeMux
, bạn có thể cung cấp middleware như sau.
mux := NewServeMux()
mux.Use(loggingMiddleware)
Nhóm Middleware
Nếu bạn có một tình huống muốn áp dụng một middleware cho một nhóm công việc, bạn có thể đạt được điều này bằng cách kết hợp nhiều trường hợp của ServeMux
. Một hạn chế là mỗi nhóm công việc cần có cùng tiền tố trong tên loại của họ.
Ví dụ:
Nếu bạn có công việc để xử lý đơn đặt hàng và công việc để xử lý sản phẩm, và bạn muốn áp dụng logic chia sẻ cho tất cả các công việc “sản phẩm” và logic chia sẻ khác cho tất cả các công việc “đơn hàng”, bạn có thể làm như sau:
productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // Áp dụng logic chia sẻ cho tất cả các công việc sản phẩm
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... Đăng ký các trình xử lý công việc "sản phẩm" khác
orderHandlers := asynq.NewServeMux()
orderHandler.Use(orderMiddleware) // Áp dụng logic chia sẻ cho tất cả các công việc đơn hàng
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... Đăng ký các trình xử lý công việc "đơn hàng" khác
// Trình xử lý cấp cao
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // Áp dụng logic chia sẻ cho tất cả các công việc
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}