ゴーランのRabbitMQのシンプルキューモード
説明: Pは生産者を、Cは消費者を、赤はキューを示します。
注意: RabbitMQに慣れていない場合は、まずRabbitMQ基本概念セクションをお読みください。
1. 依存関係のインストール
go get github.com/streadway/amqp
依存パッケージのインポート
import (
"github.com/streadway/amqp"
)
2. メッセージの送信
以下の手順は、メッセージ生産者がメッセージをプッシュする方法を示します。
2.1. RabbitMQサーバーへの接続
// RabbitMQサーバーへの接続
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
接続アドレスの説明:
amqp://ユーザー名:パスワード@RabbitMQアドレス:ポート/
2.2. チャネルの作成
ほとんどの操作はチャネル上で行われます。
ch, err := conn.Channel()
defer ch.Close()
2.3. キューの宣言
読み書きする必要のあるキューを表します。
q, err := ch.QueueDeclare(
"hello", // キュー名
false, // メッセージの永続性
false, // 使用されていないときにキューを削除
false, // 排他的
false, // No-wait
nil, // 引数
)
2.4. メッセージのプッシュ
// メッセージの内容
body := "Hello World!"
// メッセージをプッシュする
err = ch.Publish(
"", // Exchange (無視する)
q.Name, // ルーティングパラメータ、キュー名をルーティングパラメータとして使用
false, // 強制フラグ
false, // 即時フラグ
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // メッセージの内容
})
2.5. メッセージ送信の完全なコード
package main
// パッケージのインポート
import (
"log"
"github.com/streadway/amqp"
)
// エラー処理
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQに接続
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQへの接続に失敗しました")
defer conn.Close()
// チャネルの作成
ch, err := conn.Channel()
failOnError(err, "チャネルのオープンに失敗しました")
defer ch.Close()
// 操作するキューの宣言
q, err := ch.QueueDeclare(
"hello", // 名前
false, // 永続化
false, // 使用されていないときに削除
false, // 排他的
false, // No-wait
nil, // 引数
)
failOnError(err, "キューの宣言に失敗しました")
// 送信するメッセージの内容
body := "Hello World!"
// メッセージを送信
err = ch.Publish(
"", // Exchange
q.Name, // ルーティングキー
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "メッセージの送信に失敗しました")
log.Printf(" [x] Sent %s", body)
}
3. メッセージの受信
メッセージの受信の最初の3つの手順は、メッセージの送信と同じです。それぞれセクション2.1、2.2、および2.3に対応します。 メッセージの受信の完全なコードは以下の通りです:
package main
// パッケージのインポート
import (
"log"
"github.com/streadway/amqp"
)
// エラーハンドリング
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQに接続
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQへの接続に失敗しました")
defer conn.Close()
// チャンネルの作成
ch, err := conn.Channel()
failOnError(err, "チャンネルのオープンに失敗しました")
defer ch.Close()
// 操作対象のキューを宣言
q, err := ch.QueueDeclare(
"hello", // キュー名はメッセージの送信時のキュー名と一致する必要があります
false, // durable
false, // 使用されていないときに削除する
false, // 排他的
false, // no-wait
nil, // 引数
)
failOnError(err, "キューの宣言に失敗しました")
// メッセージのコンシューマを作成
msgs, err := ch.Consume(
q.Name, // キュー名
"", // コンシューマ名。指定しない場合、自動的に一意のIDが生成されます
true, // メッセージを自動的に確認するかどうか、つまり、RabbitMQにメッセージが正常に処理されたことを自動的に通知するかどうか
false, // 排他的
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "コンシューマの登録に失敗しました")
// ループでキューからメッセージを取得
for d := range msgs {
// メッセージの内容を出力
log.Printf("メッセージを受信しました:%s", d.Body)
}
}