開始ガイド
このチュートリアルでは、client
と workers
の2つのプログラムを作成します。
-
client.go
は、バックグラウンドのワーカースレッドによって非同期で処理されるタスクを作成およびスケジュールします。 -
workers.go
は、クライアントによって作成されたタスクを処理するために複数の並行ワーカースレッドを起動します。
このガイドでは、localhost:6379
で Redis サーバーが実行されていることを前提としています。開始する前に、Redis がインストールされ実行されていることを確認してください。
まず、2つのメインファイルを作成しましょう。
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
次に、asynq
パッケージをインストールします。
go get -u github.com/hibiken/asynq
コードの記述を開始する前に、これらの2つのプログラムで使用されるいくつかのコアタイプを確認しましょう。
Redis接続オプション
Asynq はメッセージブローカーとして Redis を使用しています。client.go
および workers.go
は、読み書き操作のために Redis に接続する必要があります。ローカルで実行中の Redis サーバーへの接続を指定するために、RedisClientOpt
を使用します。
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// 必要に応じてパスワードは省略可能です
Password: "mypassword",
// asynqのために専用のデータベース番号を使用します。
// デフォルトでは、Redis は16個のデータベース(0から15)を提供します。
DB: 0,
}
タスク
asynq
では、作業単位は Task
と呼ばれるタイプでカプセル化され、概念的には Type
と Payload
の2つのフィールドを持っています。
// Type はタスクのタイプを示す文字列値です。
func (t *Task) Type() string
// Payload はタスクの実行に必要なデータです。
func (t *Task) Payload() []byte
これで、コアタイプを見てみましたので、プログラムの記述を開始しましょう。
クライアントプログラム
client.go
では、asynq.Client
を使用していくつかのタスクを作成し、それらをキューに入れます。
タスクを作成するには、NewTask
関数を使用し、タスクのタイプとペイロードを渡すことができます。
Enqueue
メソッドはタスクと任意の数のオプションを取ります。タスクを将来の処理のためにスケジュールするには、ProcessIn
または ProcessAt
オプションを使用します。
// メールタスクに関連するペイロード。
type EmailTaskPayload struct {
// メールの受信者のID。
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// タイプ名とペイロードを持つタスクを作成します。
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// タスクをすぐに処理します。
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] タスクを正常にキューに入れました: %+v", info)
// 24時間後にタスクを処理します。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] タスクを正常にキューに入れました: %+v", info)
}
これで、クライアントプログラムに必要なすべてが揃いました。
Workers Program
workers.go
には、asynq.Server
インスタンスを作成してワーカーを起動します。
NewServer
関数はRedisConnOpt
とConfig
をパラメータとして受け取ります。
Config
はサーバーのタスク処理の挙動を調整するために使用されます。
利用可能なすべての構成オプションについては、Config
のドキュメントを参照できます。
この例では、単純化のために、並行性のみを指定します。
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// 注: 以下のセクションでは、`handler`とは何かを紹介します。
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
メソッドのパラメータは、asynq.Handler
インターフェースであり、ProcessTask
メソッドを持っています。
type Handler interface {
// タスクが正常に処理された場合、ProcessTaskはnilを返さなければなりません。
// ProcessTaskが非nilのエラーを返したり、パニックを引き起こした場合、タスクは後で再試行されます。
ProcessTask(context.Context, *Task) error
}
ハンドラを実装する最も簡単な方法は、同じシグネチャを持つ関数を定義し、それをRun
に渡す際にasynq.HandlerFunc
アダプタータイプを使用することです。
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d に歓迎メールを送信中", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d にリマインダーメールを送信中", p.UserID)
default:
return fmt.Errorf("予期しないタスクタイプ: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// 関数を処理するためにasynq.HandlerFuncアダプターを使用します
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
このハンドラ関数のためのswitchケースを追加することができますが、実際のアプリケーションでは、各ケースのロジックを別個の関数で定義する方が便利です。
コードをリファクタリングするために、ServeMux
を使用してハンドラを作成します。"net/http"
パッケージからのServeMux
と同様に、Handle
またはHandleFunc
を呼び出すことでハンドラを登録できます。ServeMux
はHandler
インタフェースを満たしているため、(*Server).Run
に渡すことができます。
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d に歓迎メールを送信中", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d にリマインダーメールを送信中", p.UserID)
return nil
}
各種タスクのハンドリング関数を抽出したので、コードはより整然となりました。ただし、コードはまだやや暗黙的すぎます。タスクタイプとペイロードタイプのためにこれらの文字列値があり、それらを組織的なパッケージにカプセル化する必要があります。コードをリファクタリングし、タスクの作成とハンドリングをカプセル化するためのパッケージを書きます。task
というパッケージを作成するだけです。
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// List of task types.
const (
TypeWelcomeEmail = "email:welcome" // ウェルカムメールの種類
TypeReminderEmail = "email:reminder" // リマインダーメールの種類
)
// Payload for any task related to emails.
type EmailTaskPayload struct {
// ID of the email recipient.
UserID int // メール受信者のID
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending welcome email to user %d", p.UserID) // " [*] ユーザー%dにウェルカムメールを送信"
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending reminder email to user %d", p.UserID) // " [*] ユーザー%dにリマインダーメールを送信"
return nil
}
これで、client.go
とworkers.go
でこのパッケージをインポートできます。
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// タスクをすぐにキューに入れます。
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] タスクが正常にキューに入れられました: %+v", info)
// 24時間後に処理されるようにタスクをキューに入れます。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] タスクが正常にキューに入れられました: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
コードがより見やすくなりました!
これでclient
とworkers
が準備できたので、これら2つのプログラムを実行できます。まずは、client
プログラムを実行してタスクを作成し、スケジュールしてみましょう。
go run client/client.go
これにより、即座に処理されるタスクと、24時間後に処理されるタスクの2つが作成されます。
次に、asynq
のコマンドラインインタフェースを使用してタスクを確認できます。
asynq dash
Enqueued状態のタスクと、Scheduled状態のタスクが1つずつ見えるはずです。
注意:各状態の意味を理解するには、Task Lifecycle を参照してください。
最後に、workers
プログラムを起動してタスクを処理できるようにしましょう。
go run workers/workers.go
注意:このプログラムは、終了シグナルが送信されるまで終了しません。バックグラウンドワーカーを安全に終了するベストプラクティスについては、Signals Wiki page を参照してください。
ターミナルにいくつかのテキスト出力が表示され、タスクが正常に処理されたことが示されるはずです。
client
プログラムを再度実行して、ワーカーがタスクをどのように受け入れて処理するかを確認できます。
最初の試行でタスクが正常に処理されない場合があります。デフォルトでは、失敗したタスクは指数バックオフで25回リトライされます。失敗した状況をシミュレートするために、ハンドラを更新してエラーを返すようにしましょう。
// tasks.go```
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d に歓迎のメールを送信しようとしています...", p.UserID)
return fmt.Errorf("ユーザーへのメール送信に失敗しました")
}
ワーカープログラムを再起動してタスクをキューに追加しましょう。
go run workers/workers.go
go run client/client.go
asynq dash
を実行している場合、Retry 状態のタスクを確認できます(キューの詳細ビューに移動し、「retry」タブを強調表示します)。
リトライ状態のタスクを確認するには、次のコマンドも実行できます。
asynq task ls --queue=default --state=retry
これにより、将来再試行されるすべてのタスクがリストされます。出力には、各タスクの次回実行の予定時刻が含まれます。
タスクがリトライ試行を使い果たすと、Archived 状態に移行し、再試行されません(CLI や WebUI ツールを使用してアーカイブされたタスクを手動で実行することはできます)。
このチュートリアルを締めくくる前に、ハンドラを修正しましょう。
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ユーザー %d に歓迎のメールを送信しています", p.UserID)
return nil
}