Pola Penerbitan/Langganan Golang RabbitMQ (Mode Siaran, Mode Fanout)

Pola penerbitan/langganan dalam RabbitMQ berarti pesan yang dikirim oleh produsen akan diproses oleh beberapa konsumen.

Mode Fanout

Penjelasan:

  • P mewakili produsen, C1 dan C2 mewakili konsumen, merah mewakili antrean, dan X mewakili pertukaran.
  • Pertukaran bertanggung jawab untuk meneruskan pesan ke semua antrean yang terikat ke pertukaran.
  • Beberapa antrean dapat didefinisikan, masing-masing terikat ke pertukaran yang sama.
  • Setiap antrean dapat memiliki satu atau lebih konsumen.

Catatan: Jika Anda tidak terbiasa dengan RabbitMQ, harap baca bagian Konsep Dasar RabbitMQ terlebih dahulu.

1. Pasang Paket Ketergantungan

go get github.com/streadway/amqp

2. Kirim Pesan

Langkah-langkah berikut menunjukkan bagaimana penerbit pesan mengirim pesan.

2.1. Terhubung ke Server RabbitMQ

// Terhubung ke Server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Penjelasan alamat koneksi:

amqp://namaPengguna:kataSandi@AlamatRabbitMQ:Port/

2.2. Buat Saluran (Channel)

Sebagian besar operasi diselesaikan pada saluran.

ch, err := conn.Channel()
defer ch.Close()

2.3. Mendeklarasikan Pertukaran

Pesan pertama kali dikirim ke pertukaran. Pertukaran meneruskan pesan ke antrean berdasarkan strateginya.

err = ch.ExchangeDeclare(
	"tizi365",   // Nama pertukaran
	"fanout", // Tipe pertukaran, menggunakan tipe fanout di sini, yaitu pola penerbitan/langganan
	true,     // Tahan lama (Durable)
	false,    // Otomatis dihapus (Auto-deleted)
	false,    // Internal
	false,    // No-wait
	nil,      // Argumen
)

2.4. Mengirim Pesan

// Isi pesan
body := "Halo Tizi365.com!"

// Kirim pesan
err = ch.Publish(
  "tizi365",     // Pertukaran (nama pertukaran yang sesuai dengan deklarasi sebelumnya)
  "", // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan secara otomatis, sehingga tidak perlu menyediakannya
  false,  // Mandatory
  false,  // Immediate
  amqp.Publishing {
    ContentType: "text/plain", // Tipe konten pesan, di sini berupa teks biasa
    Body:        []byte(body),  // Isi pesan
  })

2.5. Menyelesaikan Kode Dorongan Pesan

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Terhubung ke RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Gagal terhubung ke RabbitMQ")
	defer conn.Close()

	// Buat saluran
	ch, err := conn.Channel()
	failOnError(err, "Gagal membuka saluran")
	defer ch.Close()

	// Mendeklarasikan pertukaran
	err = ch.ExchangeDeclare(
		"tizi365",   // Nama pertukaran
		"fanout", // Tipe pertukaran, fanout untuk mode penerbitan/langganan
		true,     // Tahan lama
		false,    // Auto-dihapus
		false,    // Internal
		false,    // No-wait
		nil,      // Argumen
	)
	failOnError(err, "Gagal mendeklarasikan pertukaran")

	// Isi pesan
	body := "Halo Tizi365.com!"
	// Dorong pesan
	err = ch.Publish(
		"tizi365",     // Pertukaran (sesuai dengan deklarasi di atas)
		"", // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan
		false,  // Mandatory
		false,  // Immediate
		amqp.Publishing {
			ContentType: "text/plain", // Tipe konten pesan, di sini teks biasa
			Body:        []byte(body),  // Isi pesan
		})

	log.Printf("Mengirim konten %s", body)
}

3. Menerima Pesan

Langkah-langkah pertama untuk menerima pesan—terhubung ke RabbitMQ, membuat saluran, dan mendeklarasikan pertukaran—sama dengan mengirim pesan. Perujuk pada bagian sebelumnya: 2.1, 2.2, dan 2.3.

3.1. Mendeklarasikan Antrian

Mendeklarasikan antrian yang akan dioperasikan

q, err := ch.QueueDeclare(
		"",    // Nama antrian, jika tidak diisi, akan dibuat acak
		false, // Tahan lama
		false, // Hapus saat tidak digunakan
		true,  // Eksklusif
		false, // Tidak ada penundaan
		nil,   // Argumen
	)

3.2. Mengikat Antrian ke Exchange

Antrian perlu diikat ke pertukaran untuk menerima pesan

err = ch.QueueBind(
		q.Name, // Nama antrian
		"",     // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan secara otomatis
		"tizi365", // Nama Exchange, harus sesuai dengan yang ditentukan oleh pengirim pesan
		false,
		nil)

Catatan: Dalam aplikasi sebenarnya, kita dapat mendefinisikan N antrian, masing-masing diikat ke pertukaran yang sama, untuk menerima pesan yang diteruskan oleh pertukaran. Di sinilah pola penerbit/berlangganan tercermin.

3.3. Membuat Konsumen

msgs, err := ch.Consume(
		q.Name, // Merujuk nama antrian dari atas
		"",     // Nama konsumen, jika tidak ditentukan, akan dibuat secara otomatis
		true,   // Otomatis mengakui bahwa pesan telah diproses
		false,  // Eksklusif
		false,  // Tidak lokal
		false,  // Tidak ada penundaan
		nil,    // Argumen
	)
	
// Melakukan pengulangan untuk menangani pesan
for d := range msgs {
	log.Printf("Menerima pesan=%s", d.Body)
}

3.4. Menyelesaikan Kode Konsumen

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Terhubung ke RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Gagal terhubung ke RabbitMQ")
	defer conn.Close()

	// Membuat saluran, biasanya satu per konsumen
	ch, err := conn.Channel()
	failOnError(err, "Gagal membuka saluran")
	defer ch.Close()

	// Mendeklarasikan sebuah pertukaran
	err = ch.ExchangeDeclare(
		"tizi365",   // Nama Exchange, harus sesuai dengan yang digunakan oleh pengirim pesan
		"fanout", // Tipe Exchange
		true,     // Tahan lama
		false,    // Auto-hapus
		false,    // Internal
		false,    // Tidak ada penundaan
		nil,      // Argumen
	)
	failOnError(err, "Gagal mendeklarasikan pertukaran")

	// Mendeklarasikan antrian yang akan dioperasikan
	q, err := ch.QueueDeclare(
		"",    // Nama antrian, jika kosong, nama acak akan dibuat
		false, // Tahan lama
		false, // Hapus saat tidak digunakan
		true,  // Eksklusif
		false, // Tidak ada penundaan
		nil,   // Argumen
	)
	failOnError(err, "Gagal mendeklarasikan antrian")

	// Mengikat antrian ke pertukaran yang ditentukan
	err = ch.QueueBind(
		q.Name, // Nama antrian
		"",     // Kunci routing, diabaikan untuk pertukaran fanout
		"tizi365", // Nama Exchange, harus sesuai dengan yang ditentukan oleh pengirim pesan
		false,
		nil)
	failOnError(err, "Gagal mengikat antrian")

	// Membuat konsumen
	msgs, err := ch.Consume(
		q.Name, // Merujuk nama antrian sebelumnya
		"",     // Nama konsumen, akan dibuat secara otomatis jika kosong
		true,   // Auto-ack
		false,  // Eksklusif
		false,  // Tidak lokal
		false,  // Tidak ada penundaan
		nil,    // Argumen
	)
	failOnError(err, "Gagal mendaftarkan konsumen")

	// Mengonsumsi pesan dari antrian dalam pengulangan
	for d := range msgs {
		log.Printf("Menerima pesan: %s", d.Body)
	}
}

3.5. Konsumen Berganda

Lihat bagian Pola Kerja dan cukup mulai beberapa konsumen menggunakan goroutine.