На этой странице я объясню дизайн интерфейса Handler
.
Handler
, который вы предоставляете серверу, является основой вашей логики обработки асинхронных задач. Обязанность Handler - принять задачу и обработать ее с учетом контекста. Если обработка завершится неудачей, он должен сообщить об ошибках для последующей попытки выполнения задачи.
Интерфейс определен следующим образом:
type Handler interface {
ProcessTask(context.Context, *Task) error
}
Это простой интерфейс, кратко описывающий обязанности Handler.
Существует несколько способов реализовать этот интерфейс обработчика.
Вот пример определения собственного типа структуры для обработки задач:
type MyTaskHandler struct {
// ... поля
}
// Реализация метода ProcessTask
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// ... логика обработки задачи
}
Вы можете даже определить функцию для удовлетворения интерфейса благодаря адаптеру типа HandlerFunc
.
func myHandler(ctx context.Context, t *asynq.Task) error {
// ... логика обработки задачи
}
// h удовлетворяет интерфейсу Handler
h := asynq.HandlerFunc(myHandler)
В большинстве случаев вам может потребоваться проверить тип входной задачи Type
и обработать его соответственно.
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "type1":
// обработать type1
case "type2":
// обработать type2
case "typeN":
// обработать typeN
default:
return fmt.Errorf("неожиданный тип задачи: %q", t.Type())
}
}
Как видите, обработчик может состоять из многих различных обработчиков. Каждый случай в приведенном выше примере может быть обработан отдельным обработчиком. В этом месте используется тип ServeMux
.
Примечание: для реализации обработчика вам необязательно использовать тип ServeMux
, но во многих случаях он может пригодиться.
Используя ServeMux
, вы можете зарегистрировать несколько обработчиков. Он будет сопоставлять тип каждой задачи с зарегистрированными шаблонами и вызывать обработчик, соответствующий типу задачи, самому близкому по образцу к имени типа задачи.
mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // Зарегистрировать обработчик
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:", defaultEmailHandler) // Обработчик по умолчанию для других типов задач, начинающихся с "email:"
Использование Middleware
Если вам нужно выполнить некоторый код до и/или после обработки запросов, вы можете сделать это с помощью промежуточного программного обеспечения. Middleware - это функция, которая принимает Handler
и возвращает Handler
.
Ниже приведен пример промежуточного программного обеспечения, которое регистрирует начало и конец обработки задачи.
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("Начало обработки для %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("Обработка для %q завершена: затраченное время = %v", t.Type(), time.Since(start))
return nil
})
}
Теперь вы можете использовать это промежуточное программное обеспечение для “обертывания” вашего обработчика.
myHandler = loggingMiddleware(myHandler)
Кроме того, если вы используете ServeMux
, вы можете указать промежуточное программное обеспечение следующим образом.
mux := NewServeMux()
mux.Use(loggingMiddleware)
Группировка промежуточного ПО
Если у вас есть сценарий, в котором вы хотите применить промежуточное ПО к группе задач, вы можете достичь этого путем объединения нескольких экземпляров ServeMux
. Одно ограничение заключается в том, что у каждой группы задач должен быть одинаковый префикс в их типах.
Пример:
Если у вас есть задачи для обработки заказов и задачи для обработки продуктов, и вы хотите применить общую логику ко всем задачам “продуктов” и другую общую логику ко всем задачам “заказов”, вы можете сделать это следующим образом:
handlersПродукт := asynq.NewServeMux()
handlersПродукт.Use(промежуточноеПродукта) // Применить общую логику ко всем задачам "продуктов"
handlersПродукт.HandleFunc("продукт:обновление", обработчикЗадачиПродуктаОбновления)
// ... Регистрация других обработчиков задач "продуктов"
handlersЗаказ := asynq.NewServeMux()
handlersЗаказ.Use(промежуточноеЗаказа) // Применить общую логику ко всем задачам "заказов"
handlersЗаказ.HandleFunc("заказ:возврат", обработчикЗадачиВозврата)
// ... Регистрация других обработчиков задач "заказов"
// Обработчик верхнего уровня
mux := asynq.NewServeMux()
mux.Use(некотороеГлобальноеПромежуточноеПО) // Применить общую логику ко всем задачам
mux.Handle("продукт:", handlersПродукт)
mux.Handle("заказ:", handlersЗаказ)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}