このページでは、Handler インターフェースの設計について説明します。

サーバーに提供するHandlerは、非同期タスクの処理ロジックの中核です。Handlerの責任は、コンテキストを考慮しながらタスクを受け入れて処理し、処理が失敗した場合はエラーを報告し、後でタスクを再試行できるようにすることです。

インターフェースは次のように定義されています:

type Handler interface {
    ProcessTask(context.Context, *Task) error
}

これは、Handlerの責任を簡潔に説明しているシンプルなインターフェースです。

このハンドラーインターフェースを実装するさまざまな方法があります。

以下は、自分自身の構造体型を定義してタスクを処理する例です:

type MyTaskHandler struct {
   // ... fields
}

// ProcessTask メソッドの実装
func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   // ... タスク処理ロジック
}

さらに、HandlerFunc アダプタータイプのおかげで、インターフェースに適合する関数を定義することさえできます。

func myHandler(ctx context.Context, t *asynq.Task) error {
    // ... タスク処理ロジック
}

// h は Handler インターフェースに適合します
h := asynq.HandlerFunc(myHandler)

ほとんどの場合、入力タスクの Type を確認して適切に処理する必要があります。

func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   switch t.Type() {
   case "type1":
      // type1 を処理
   case "type2":
      // type2 を処理
   case "typeN":
      // typeN を処理
   default:
      return fmt.Errorf("予期しないタスクの種類: %q", t.Type())
   }
}

ご覧の通り、ハンドラーは多くの異なるハンドラーで構成することができます。上記の各ケースは、専用のハンドラーによって処理できます。これが ServeMux タイプの役割です。

注意: ServeMux タイプを使用する必要はありませんが、多くの場合非常に便利です。
ServeMux を使用することで、複数のハンドラーを登録することができます。それはそれぞれのタスクのタイプと登録されたパターンを照合し、タスクタイプ名に最も近いパターンに対応するハンドラーを呼び出します。

mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler) // ハンドラーを登録
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:", defaultEmailHandler) // "email:" で始まる他のタスクタイプのデフォルトハンドラー

ミドルウェアの使用

リクエストの処理の前および/または後にコードを実行する必要がある場合、ミドルウェアを使用してこれを実現できます。ミドルウェアは Handler を取り、Handler を返す関数です。
以下は、タスクの処理の開始と終了をログに記録するミドルウェアの例です。

func loggingMiddleware(h asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        log.Printf("Processing started for %q", t.Type())
        err := h.ProcessTask(ctx, t)
        if err != nil {
            return err
        }
        log.Printf("Processing completed for %q: elapsed time = %v", t.Type(), time.Since(start))
        return nil
    })
}

これで、このミドルウェアを使用してハンドラーを “ラップ” することができます。

myHandler = loggingMiddleware(myHandler)

さらに、ServeMux を使用している場合は、以下のようにミドルウェアを提供できます。

mux := NewServeMux()
mux.Use(loggingMiddleware)

グループ化ミドルウェア

特定のシナリオで複数のタスクにミドルウェアを適用したい場合、複数の ServeMux インスタンスを組み合わせることで実現できます。ただし、各タスクグループのタイプ名には同じ接頭辞が必要です

例:
注文を処理するタスクと商品を処理するタスクがあり、すべての “product” タスクに共通のロジックを適用し、他の “order” タスクにも別の共通のロジックを適用したい場合は、次のようにします:

productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // すべてのproductタスクに共通のロジックを適用
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... 他の "product" タスクハンドラを登録

orderHandlers := asynq.NewServeMux()
orderHandlers.Use(orderMiddleware) // すべてのorderタスクに共通のロジックを適用
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... 他の "order" タスクハンドラを登録

// トップレベルのハンドラ
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // すべてのタスクに共通のロジックを適用
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)

if err := srv.Run(mux); err != nil {
    log.Fatal(err)
}