На этой странице я объясню дизайн интерфейса Handler.

Handler, который вы предоставляете серверу, является основой вашей логики обработки асинхронных задач. Обязанность Handler - принять задачу и обработать ее с учетом контекста. Если обработка завершится неудачей, он должен сообщить об ошибках для последующей попытки выполнения задачи.

Интерфейс определен следующим образом:

type Handler interface {
    ProcessTask(context.Context, *Task) error
}

Это простой интерфейс, кратко описывающий обязанности Handler.

Существует несколько способов реализовать этот интерфейс обработчика.

Вот пример определения собственного типа структуры для обработки задач:

type MyTaskHandler struct {
   // ... поля
}

// Реализация метода ProcessTask
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   // ... логика обработки задачи
}

Вы можете даже определить функцию для удовлетворения интерфейса благодаря адаптеру типа HandlerFunc.

func myHandler(ctx context.Context, t *asynq.Task) error {
    // ... логика обработки задачи
}

// h удовлетворяет интерфейсу Handler
h := asynq.HandlerFunc(myHandler)

В большинстве случаев вам может потребоваться проверить тип входной задачи Type и обработать его соответственно.

func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   switch t.Type() {
   case "type1":
      // обработать type1
   case "type2":
      // обработать type2
   case "typeN":
      // обработать typeN
   default:
      return fmt.Errorf("неожиданный тип задачи: %q", t.Type())
   }
}

Как видите, обработчик может состоять из многих различных обработчиков. Каждый случай в приведенном выше примере может быть обработан отдельным обработчиком. В этом месте используется тип ServeMux.

Примечание: для реализации обработчика вам необязательно использовать тип ServeMux, но во многих случаях он может пригодиться.
Используя ServeMux, вы можете зарегистрировать несколько обработчиков. Он будет сопоставлять тип каждой задачи с зарегистрированными шаблонами и вызывать обработчик, соответствующий типу задачи, самому близкому по образцу к имени типа задачи.

mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // Зарегистрировать обработчик
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:", defaultEmailHandler) // Обработчик по умолчанию для других типов задач, начинающихся с "email:"

Использование Middleware

Если вам нужно выполнить некоторый код до и/или после обработки запросов, вы можете сделать это с помощью промежуточного программного обеспечения. Middleware - это функция, которая принимает Handler и возвращает Handler.
Ниже приведен пример промежуточного программного обеспечения, которое регистрирует начало и конец обработки задачи.

func loggingMiddleware(h asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        log.Printf("Начало обработки для %q", t.Type())
        err := h.ProcessTask(ctx, t)
        if err != nil {
            return err
        }
        log.Printf("Обработка для %q завершена: затраченное время = %v", t.Type(), time.Since(start))
        return nil
    })
}

Теперь вы можете использовать это промежуточное программное обеспечение для “обертывания” вашего обработчика.

myHandler = loggingMiddleware(myHandler)

Кроме того, если вы используете ServeMux, вы можете указать промежуточное программное обеспечение следующим образом.

mux := NewServeMux()
mux.Use(loggingMiddleware)

Группировка промежуточного ПО

Если у вас есть сценарий, в котором вы хотите применить промежуточное ПО к группе задач, вы можете достичь этого путем объединения нескольких экземпляров ServeMux. Одно ограничение заключается в том, что у каждой группы задач должен быть одинаковый префикс в их типах.

Пример:
Если у вас есть задачи для обработки заказов и задачи для обработки продуктов, и вы хотите применить общую логику ко всем задачам “продуктов” и другую общую логику ко всем задачам “заказов”, вы можете сделать это следующим образом:

handlersПродукт := asynq.NewServeMux()
handlersПродукт.Use(промежуточноеПродукта) // Применить общую логику ко всем задачам "продуктов"
handlersПродукт.HandleFunc("продукт:обновление", обработчикЗадачиПродуктаОбновления)
// ... Регистрация других обработчиков задач "продуктов"

handlersЗаказ := asynq.NewServeMux()
handlersЗаказ.Use(промежуточноеЗаказа) // Применить общую логику ко всем задачам "заказов"
handlersЗаказ.HandleFunc("заказ:возврат", обработчикЗадачиВозврата)
// ... Регистрация других обработчиков задач "заказов"

// Обработчик верхнего уровня
mux := asynq.NewServeMux()
mux.Use(некотороеГлобальноеПромежуточноеПО) // Применить общую логику ко всем задачам
mux.Handle("продукт:", handlersПродукт)
mux.Handle("заказ:", handlersЗаказ)

if err := srv.Run(mux); err != nil {
    log.Fatal(err)
}