開始ガイド

このチュートリアルでは、clientworkers の2つのプログラムを作成します。

  • client.go は、バックグラウンドのワーカースレッドによって非同期で処理されるタスクを作成およびスケジュールします。
  • workers.go は、クライアントによって作成されたタスクを処理するために複数の並行ワーカースレッドを起動します。

このガイドでは、localhost:6379 で Redis サーバーが実行されていることを前提としています。開始する前に、Redis がインストールされ実行されていることを確認してください。

まず、2つのメインファイルを作成しましょう。

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

次に、asynq パッケージをインストールします。

go get -u github.com/hibiken/asynq

コードの記述を開始する前に、これらの2つのプログラムで使用されるいくつかのコアタイプを確認しましょう。

Redis接続オプション

Asynq はメッセージブローカーとして Redis を使用しています。client.go および workers.go は、読み書き操作のために Redis に接続する必要があります。ローカルで実行中の Redis サーバーへの接続を指定するために、RedisClientOpt を使用します。

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // 必要に応じてパスワードは省略可能です
    Password: "mypassword",
    // asynqのために専用のデータベース番号を使用します。
    // デフォルトでは、Redis は16個のデータベース(0から15)を提供します。
    DB: 0,
}

タスク

asynq では、作業単位は Task と呼ばれるタイプでカプセル化され、概念的には TypePayload の2つのフィールドを持っています。

// Type はタスクのタイプを示す文字列値です。
func (t *Task) Type() string

// Payload はタスクの実行に必要なデータです。
func (t *Task) Payload() []byte

これで、コアタイプを見てみましたので、プログラムの記述を開始しましょう。

クライアントプログラム

client.go では、asynq.Client を使用していくつかのタスクを作成し、それらをキューに入れます。

タスクを作成するには、NewTask 関数を使用し、タスクのタイプとペイロードを渡すことができます。

Enqueue メソッドはタスクと任意の数のオプションを取ります。タスクを将来の処理のためにスケジュールするには、ProcessIn または ProcessAt オプションを使用します。

// メールタスクに関連するペイロード。
type EmailTaskPayload struct {
    // メールの受信者のID。
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // タイプ名とペイロードを持つタスクを作成します。
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // タスクをすぐに処理します。
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] タスクを正常にキューに入れました: %+v", info)

    // 24時間後にタスクを処理します。
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] タスクを正常にキューに入れました: %+v", info)
}

これで、クライアントプログラムに必要なすべてが揃いました。

Workers Program

workers.goには、asynq.Serverインスタンスを作成してワーカーを起動します。

NewServer関数はRedisConnOptConfigをパラメータとして受け取ります。

Configはサーバーのタスク処理の挙動を調整するために使用されます。 利用可能なすべての構成オプションについては、Configのドキュメントを参照できます。

この例では、単純化のために、並行性のみを指定します。

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 注: 以下のセクションでは、`handler`とは何かを紹介します。
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Runメソッドのパラメータは、asynq.Handlerインターフェースであり、ProcessTaskメソッドを持っています。

type Handler interface {
    // タスクが正常に処理された場合、ProcessTaskはnilを返さなければなりません。
    // ProcessTaskが非nilのエラーを返したり、パニックを引き起こした場合、タスクは後で再試行されます。
    ProcessTask(context.Context, *Task) error
}

ハンドラを実装する最も簡単な方法は、同じシグネチャを持つ関数を定義し、それをRunに渡す際にasynq.HandlerFuncアダプタータイプを使用することです。

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ユーザー %d に歓迎メールを送信中", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ユーザー %d にリマインダーメールを送信中", p.UserID)

    default:
        return fmt.Errorf("予期しないタスクタイプ: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 関数を処理するためにasynq.HandlerFuncアダプターを使用します
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

このハンドラ関数のためのswitchケースを追加することができますが、実際のアプリケーションでは、各ケースのロジックを別個の関数で定義する方が便利です。

コードをリファクタリングするために、ServeMuxを使用してハンドラを作成します。"net/http"パッケージからのServeMuxと同様に、HandleまたはHandleFuncを呼び出すことでハンドラを登録できます。ServeMuxHandlerインタフェースを満たしているため、(*Server).Runに渡すことができます。

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ユーザー %d に歓迎メールを送信中", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ユーザー %d にリマインダーメールを送信中", p.UserID)
    return nil
}

各種タスクのハンドリング関数を抽出したので、コードはより整然となりました。ただし、コードはまだやや暗黙的すぎます。タスクタイプとペイロードタイプのためにこれらの文字列値があり、それらを組織的なパッケージにカプセル化する必要があります。コードをリファクタリングし、タスクの作成とハンドリングをカプセル化するためのパッケージを書きます。taskというパッケージを作成するだけです。

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// List of task types.
const (
    TypeWelcomeEmail  = "email:welcome"   // ウェルカムメールの種類
    TypeReminderEmail = "email:reminder"  // リマインダーメールの種類
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
    // ID of the email recipient.
    UserID int  // メール受信者のID
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)  // " [*] ユーザー%dにウェルカムメールを送信"
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)  // " [*] ユーザー%dにリマインダーメールを送信"
    return nil
}

これで、client.goworkers.goでこのパッケージをインポートできます。

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // タスクをすぐにキューに入れます。
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] タスクが正常にキューに入れられました: %+v", info)

    // 24時間後に処理されるようにタスクをキューに入れます。
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] タスクが正常にキューに入れられました: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

コードがより見やすくなりました!

これでclientworkersが準備できたので、これら2つのプログラムを実行できます。まずは、clientプログラムを実行してタスクを作成し、スケジュールしてみましょう。

go run client/client.go

これにより、即座に処理されるタスクと、24時間後に処理されるタスクの2つが作成されます。

次に、asynqのコマンドラインインタフェースを使用してタスクを確認できます。

asynq dash

Enqueued状態のタスクと、Scheduled状態のタスクが1つずつ見えるはずです。

注意:各状態の意味を理解するには、Task Lifecycle を参照してください。

最後に、workersプログラムを起動してタスクを処理できるようにしましょう。

go run workers/workers.go

注意:このプログラムは、終了シグナルが送信されるまで終了しません。バックグラウンドワーカーを安全に終了するベストプラクティスについては、Signals Wiki page を参照してください。

ターミナルにいくつかのテキスト出力が表示され、タスクが正常に処理されたことが示されるはずです。

clientプログラムを再度実行して、ワーカーがタスクをどのように受け入れて処理するかを確認できます。

最初の試行でタスクが正常に処理されない場合があります。デフォルトでは、失敗したタスクは指数バックオフで25回リトライされます。失敗した状況をシミュレートするために、ハンドラを更新してエラーを返すようにしましょう。

// tasks.go```
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ユーザー %d に歓迎のメールを送信しようとしています...", p.UserID)
    return fmt.Errorf("ユーザーへのメール送信に失敗しました")
}

ワーカープログラムを再起動してタスクをキューに追加しましょう。

go run workers/workers.go
go run client/client.go

asynq dash を実行している場合、Retry 状態のタスクを確認できます(キューの詳細ビューに移動し、「retry」タブを強調表示します)。

リトライ状態のタスクを確認するには、次のコマンドも実行できます。

asynq task ls --queue=default --state=retry

これにより、将来再試行されるすべてのタスクがリストされます。出力には、各タスクの次回実行の予定時刻が含まれます。

タスクがリトライ試行を使い果たすと、Archived 状態に移行し、再試行されません(CLI や WebUI ツールを使用してアーカイブされたタスクを手動で実行することはできます)。

このチュートリアルを締めくくる前に、ハンドラを修正しましょう。

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ユーザー %d に歓迎のメールを送信しています", p.UserID)
    return nil 
}