Golang RabbitMQ Publish/Subscribe Pattern(ブロードキャストモード、ファンアウトモード)

RabbitMQにおけるパブリッシュ/サブスクライブパターンは、プロデューサーによって送信されたメッセージが複数のコンシューマーによって処理されることを意味します。

ファンアウトモード

説明:

  • Pはプロデューサーを表し、C1とC2はコンシューマーを表し、赤はキューを表し、Xはエクスチェンジを表します。
  • エクスチェンジは、エクスチェンジにバインドされたすべてのキューにメッセージを転送する責任があります。
  • 複数のキューを定義し、それぞれを同じエクスチェンジにバインドできます。
  • 各キューには1つ以上のコンシューマーを持たせることができます。

注意:RabbitMQに慣れていない場合は、まずRabbitMQ基本概念セクションをお読みください。

1. 依存パッケージのインストール

go get github.com/streadway/amqp

2. メッセージの送信

以下の手順は、メッセージプロデューサーがメッセージを送信する方法を示しています。

2.1. RabbitMQサーバーへの接続

// RabbitMQサーバーに接続
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

接続アドレスの説明:

amqp://ユーザー名:パスワード@RabbitMQアドレス:ポート/

2.2. チャネルの作成

ほとんどの操作はチャネル上で完了します。

ch, err := conn.Channel()
defer ch.Close()

2.3. エクスチェンジの宣言

メッセージはまずエクスチェンジに送られます。エクスチェンジは、その戦略に基づいてメッセージをキューに転送します。

err = ch.ExchangeDeclare(
    "tizi365",   // エクスチェンジ名
    "fanout", // エクスチェンジタイプ、ここではファンアウトタイプを使用しているため、パブリッシュ/サブスクライブパターン
    true,     // 耐久性
    false,    // 自動削除
    false,    // 内部
    false,    // No-wait
    nil,      // 引数
)

2.4. メッセージのパブリッシュ

// メッセージの内容
body := "こんにちは、Tizi365.com!"

// メッセージをパブリッシュ
err = ch.Publish(
  "tizi365",     // エクスチェンジ(前述の宣言に対応するエクスチェンジ名)
  "", // ルーティングキー、ファンアウトタイプのエクスチェンジでは自動的に無視されるため、提供する必要はありません
  false,  // Mandatory
  false,  // Immediate
  amqp.Publishing {
    ContentType: "text/plain", // メッセージの内容タイプ、ここではプレーンテキストです
    Body:        []byte(body),  // メッセージの内容
  })

2.5. メッセージの送信コードの完成

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQに接続
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQへの接続に失敗しました")
	defer conn.Close()

	// チャネルを作成
	ch, err := conn.Channel()
	failOnError(err, "チャネルのオープンに失敗しました")
	defer ch.Close()

	// エクスチェンジの宣言
	err = ch.ExchangeDeclare(
		"tizi365",   // エクスチェンジ名
		"fanout", // エクスチェンジタイプ、パブリッシュ/サブスクライブモードにはファンアウトを指定
		true,     // 耐久性
		false,    // 自動削除
		false,    // 内部
		false,    // No-wait
		nil,      // 引数
	)
	failOnError(err, "エクスチェンジの宣言に失敗しました")

	// メッセージの内容
	body := "こんにちは、Tizi365.com!"
	// メッセージのプッシュ
	err = ch.Publish(
		"tizi365",     // エクスチェンジ(上記の宣言と一致)
		"", // ルーティングキー、ファンアウトタイプのエクスチェンジでは自動的に無視される
		false,  // Mandatory
		false,  // Immediate
		amqp.Publishing {
			ContentType: "text/plain", // メッセージの内容タイプ、ここはプレーンテキスト
			Body:        []byte(body),  // メッセージの内容
		})

	log.Printf("送信内容 %s", body)
}

3. メッセージの受信

メッセージを受信するための最初の3つの手順―RabbitMQへの接続、チャネルの作成、エクスチェンジの宣言―は、メッセージの送信と同じです。前述の2.1、2.2、および2.3のセクションを参照してください。

3.1. キューを宣言する

キューを操作するために宣言する

q, err := ch.QueueDeclare(
		"",    // キュー名。指定しない場合、ランダムな名前が生成されます
		false, // 耐久性
		false, // 未使用時に削除
		true,  // 排他的
		false, // ノーウェイト
		nil,   // 引数
	)

3.2. キューをエクスチェンジにバインドする

メッセージを受信するためには、キューをエクスチェンジにバインドする必要があります

err = ch.QueueBind(
		q.Name, // キュー名
		"",     // ルーティングキー。ファンアウトタイプのエクスチェンジの場合、ルーティングキーは自動的に無視されます
		"tizi365", // エクスチェンジ名。送信元のメッセージと一致する必要があります
		false,
		nil)

注意: 実際のアプリケーションでは、N個のキューを定義し、それぞれを同じエクスチェンジにバインドして、エクスチェンジから転送されるメッセージを受信することができます。これがパブリッシュ/サブスクライブパターンが反映される点です。

3.3. コンシューマを作成する

msgs, err := ch.Consume(
		q.Name, // 上記で参照したキュー名
		"",     // コンシューマ名。指定しない場合、自動的に生成されます
		true,   // メッセージを自動的に処理したことを確認
		false,  // 排他的
		false,  // ローカルでない
		false,  // ノーウェイト
		nil,    // 引数
	)
	
// メッセージを処理するためのループ
for d := range msgs {
	log.Printf("メッセージを受信しました=%s", d.Body)
}

3.4. コンシューマのコードを完成させる

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQに接続
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQへの接続に失敗しました")
	defer conn.Close()

	// 通常、コンシューマごとに1つのチャネルを作成します
	ch, err := conn.Channel()
	failOnError(err, "チャネルのオープンに失敗しました")
	defer ch.Close()

	// エクスチェンジを宣言する
	err = ch.ExchangeDeclare(
		"tizi365",   // エクスチェンジ名。メッセージの送信元と一致する必要があります
		"fanout", // エクスチェンジタイプ
		true,     // 耐久性
		false,    // 自動削除
		false,    // 内部的
		false,    // ノーウェイト
		nil,      // 引数
	)
	failOnError(err, "エクスチェンジの宣言に失敗しました")

	// 操作するキューを宣言する
	q, err := ch.QueueDeclare(
		"",    // キュー名。空の場合、ランダムな名前が生成されます
		false, // 耐久性
		false, // 未使用時に削除
		true,  // 排他的
		false, // ノーウェイト
		nil,   // 引数
	)
	failOnError(err, "キューの宣言に失敗しました")

	// 指定されたエクスチェンジにキューをバインドする
	err = ch.QueueBind(
		q.Name, // キュー名
		"",     // ルーティングキー。ファンアウトエクスチェンジの場合は無視されます
		"tizi365", // エクスチェンジ名。メッセージの送信元と一致する必要があります
		false,
		nil)
	failOnError(err, "キューのバインドに失敗しました")

	// コンシューマを作成する
	msgs, err := ch.Consume(
		q.Name, // 先ほどのキュー名を参照
		"",     // コンシューマ名。空の場合は自動的に生成されます
		true,   // 自動アック
		false,  // 排他的
		false,  // ローカルでない
		false,  // ノーウェイト
		nil,    // 引数
	)
	failOnError(err, "コンシューマの登録に失敗しました")

	// ループ内でキューからメッセージを受信する
	for d := range msgs {
		log.Printf("受信したメッセージ: %s", d.Body)
	}
}

3.5. 複数のコンシューマ

ワークモードセクションを参照して、単純にゴルーチンを使用して複数のコンシューマを起動します。