ゴーランのRabbitMQのシンプルキューモード

Golang RabbitMQ

説明: Pは生産者を、Cは消費者を、赤はキューを示します。

注意: RabbitMQに慣れていない場合は、まずRabbitMQ基本概念セクションをお読みください。

1. 依存関係のインストール

go get github.com/streadway/amqp

依存パッケージのインポート

import (
  "github.com/streadway/amqp"
)

2. メッセージの送信

以下の手順は、メッセージ生産者がメッセージをプッシュする方法を示します。

2.1. RabbitMQサーバーへの接続

// RabbitMQサーバーへの接続
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

接続アドレスの説明:

amqp://ユーザー名:パスワード@RabbitMQアドレス:ポート/

2.2. チャネルの作成

ほとんどの操作はチャネル上で行われます。

ch, err := conn.Channel()
defer ch.Close()

2.3. キューの宣言

読み書きする必要のあるキューを表します。

q, err := ch.QueueDeclare(
  "hello", // キュー名
  false,   // メッセージの永続性
  false,   // 使用されていないときにキューを削除
  false,   // 排他的
  false,   // No-wait
  nil,     // 引数
)

2.4. メッセージのプッシュ

// メッセージの内容
body := "Hello World!"

// メッセージをプッシュする
err = ch.Publish(
  "",     // Exchange (無視する)
  q.Name, // ルーティングパラメータ、キュー名をルーティングパラメータとして使用
  false,  // 強制フラグ
  false,  // 即時フラグ
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // メッセージの内容
  })

2.5. メッセージ送信の完全なコード

package main

// パッケージのインポート
import (
	"log"
	"github.com/streadway/amqp"
)

// エラー処理
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQに接続
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQへの接続に失敗しました")
	defer conn.Close()

	// チャネルの作成
	ch, err := conn.Channel()
	failOnError(err, "チャネルのオープンに失敗しました")
	defer ch.Close()

	// 操作するキューの宣言
	q, err := ch.QueueDeclare(
		"hello", // 名前
		false,   // 永続化
		false,   // 使用されていないときに削除
		false,   // 排他的
		false,   // No-wait
		nil,     // 引数
	)
	failOnError(err, "キューの宣言に失敗しました")

	// 送信するメッセージの内容
	body := "Hello World!"

	// メッセージを送信
	err = ch.Publish(
		"",     // Exchange
		q.Name, // ルーティングキー
		false,  // Mandatory
		false,  // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "メッセージの送信に失敗しました")
	log.Printf(" [x] Sent %s", body)
}

3. メッセージの受信

メッセージの受信の最初の3つの手順は、メッセージの送信と同じです。それぞれセクション2.1、2.2、および2.3に対応します。 メッセージの受信の完全なコードは以下の通りです:

package main

// パッケージのインポート
import (
	"log"
	"github.com/streadway/amqp"
)

// エラーハンドリング
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQに接続
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQへの接続に失敗しました")
	defer conn.Close()

	// チャンネルの作成
	ch, err := conn.Channel()
	failOnError(err, "チャンネルのオープンに失敗しました")
	defer ch.Close()
	
	// 操作対象のキューを宣言
	q, err := ch.QueueDeclare(
		"hello", // キュー名はメッセージの送信時のキュー名と一致する必要があります
		false,   // durable
		false,   // 使用されていないときに削除する
		false,   // 排他的
		false,   // no-wait
		nil,     // 引数
	)
	failOnError(err, "キューの宣言に失敗しました")

	// メッセージのコンシューマを作成
	msgs, err := ch.Consume(
		q.Name, // キュー名
		"",     // コンシューマ名。指定しない場合、自動的に一意のIDが生成されます
		true,   // メッセージを自動的に確認するかどうか、つまり、RabbitMQにメッセージが正常に処理されたことを自動的に通知するかどうか
		false,  // 排他的
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "コンシューマの登録に失敗しました")
	
	// ループでキューからメッセージを取得
	for d := range msgs {
		// メッセージの内容を出力
		log.Printf("メッセージを受信しました:%s", d.Body)
	}
}