Pola Penerbitan/Langganan Golang RabbitMQ (Mode Siaran, Mode Fanout)
Pola penerbitan/langganan dalam RabbitMQ berarti pesan yang dikirim oleh produsen akan diproses oleh beberapa konsumen.
Penjelasan:
- P mewakili produsen, C1 dan C2 mewakili konsumen, merah mewakili antrean, dan X mewakili pertukaran.
- Pertukaran bertanggung jawab untuk meneruskan pesan ke semua antrean yang terikat ke pertukaran.
- Beberapa antrean dapat didefinisikan, masing-masing terikat ke pertukaran yang sama.
- Setiap antrean dapat memiliki satu atau lebih konsumen.
Catatan: Jika Anda tidak terbiasa dengan RabbitMQ, harap baca bagian Konsep Dasar RabbitMQ terlebih dahulu.
1. Pasang Paket Ketergantungan
go get github.com/streadway/amqp
2. Kirim Pesan
Langkah-langkah berikut menunjukkan bagaimana penerbit pesan mengirim pesan.
2.1. Terhubung ke Server RabbitMQ
// Terhubung ke Server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Penjelasan alamat koneksi:
amqp://namaPengguna:kataSandi@AlamatRabbitMQ:Port/
2.2. Buat Saluran (Channel)
Sebagian besar operasi diselesaikan pada saluran.
ch, err := conn.Channel()
defer ch.Close()
2.3. Mendeklarasikan Pertukaran
Pesan pertama kali dikirim ke pertukaran. Pertukaran meneruskan pesan ke antrean berdasarkan strateginya.
err = ch.ExchangeDeclare(
"tizi365", // Nama pertukaran
"fanout", // Tipe pertukaran, menggunakan tipe fanout di sini, yaitu pola penerbitan/langganan
true, // Tahan lama (Durable)
false, // Otomatis dihapus (Auto-deleted)
false, // Internal
false, // No-wait
nil, // Argumen
)
2.4. Mengirim Pesan
// Isi pesan
body := "Halo Tizi365.com!"
// Kirim pesan
err = ch.Publish(
"tizi365", // Pertukaran (nama pertukaran yang sesuai dengan deklarasi sebelumnya)
"", // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan secara otomatis, sehingga tidak perlu menyediakannya
false, // Mandatory
false, // Immediate
amqp.Publishing {
ContentType: "text/plain", // Tipe konten pesan, di sini berupa teks biasa
Body: []byte(body), // Isi pesan
})
2.5. Menyelesaikan Kode Dorongan Pesan
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Terhubung ke RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Gagal terhubung ke RabbitMQ")
defer conn.Close()
// Buat saluran
ch, err := conn.Channel()
failOnError(err, "Gagal membuka saluran")
defer ch.Close()
// Mendeklarasikan pertukaran
err = ch.ExchangeDeclare(
"tizi365", // Nama pertukaran
"fanout", // Tipe pertukaran, fanout untuk mode penerbitan/langganan
true, // Tahan lama
false, // Auto-dihapus
false, // Internal
false, // No-wait
nil, // Argumen
)
failOnError(err, "Gagal mendeklarasikan pertukaran")
// Isi pesan
body := "Halo Tizi365.com!"
// Dorong pesan
err = ch.Publish(
"tizi365", // Pertukaran (sesuai dengan deklarasi di atas)
"", // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan
false, // Mandatory
false, // Immediate
amqp.Publishing {
ContentType: "text/plain", // Tipe konten pesan, di sini teks biasa
Body: []byte(body), // Isi pesan
})
log.Printf("Mengirim konten %s", body)
}
3. Menerima Pesan
Langkah-langkah pertama untuk menerima pesan—terhubung ke RabbitMQ, membuat saluran, dan mendeklarasikan pertukaran—sama dengan mengirim pesan. Perujuk pada bagian sebelumnya: 2.1, 2.2, dan 2.3.
3.1. Mendeklarasikan Antrian
Mendeklarasikan antrian yang akan dioperasikan
q, err := ch.QueueDeclare(
"", // Nama antrian, jika tidak diisi, akan dibuat acak
false, // Tahan lama
false, // Hapus saat tidak digunakan
true, // Eksklusif
false, // Tidak ada penundaan
nil, // Argumen
)
3.2. Mengikat Antrian ke Exchange
Antrian perlu diikat ke pertukaran untuk menerima pesan
err = ch.QueueBind(
q.Name, // Nama antrian
"", // Kunci routing, untuk pertukaran tipe fanout, kunci routing diabaikan secara otomatis
"tizi365", // Nama Exchange, harus sesuai dengan yang ditentukan oleh pengirim pesan
false,
nil)
Catatan: Dalam aplikasi sebenarnya, kita dapat mendefinisikan N antrian, masing-masing diikat ke pertukaran yang sama, untuk menerima pesan yang diteruskan oleh pertukaran. Di sinilah pola penerbit/berlangganan tercermin.
3.3. Membuat Konsumen
msgs, err := ch.Consume(
q.Name, // Merujuk nama antrian dari atas
"", // Nama konsumen, jika tidak ditentukan, akan dibuat secara otomatis
true, // Otomatis mengakui bahwa pesan telah diproses
false, // Eksklusif
false, // Tidak lokal
false, // Tidak ada penundaan
nil, // Argumen
)
// Melakukan pengulangan untuk menangani pesan
for d := range msgs {
log.Printf("Menerima pesan=%s", d.Body)
}
3.4. Menyelesaikan Kode Konsumen
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Terhubung ke RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Gagal terhubung ke RabbitMQ")
defer conn.Close()
// Membuat saluran, biasanya satu per konsumen
ch, err := conn.Channel()
failOnError(err, "Gagal membuka saluran")
defer ch.Close()
// Mendeklarasikan sebuah pertukaran
err = ch.ExchangeDeclare(
"tizi365", // Nama Exchange, harus sesuai dengan yang digunakan oleh pengirim pesan
"fanout", // Tipe Exchange
true, // Tahan lama
false, // Auto-hapus
false, // Internal
false, // Tidak ada penundaan
nil, // Argumen
)
failOnError(err, "Gagal mendeklarasikan pertukaran")
// Mendeklarasikan antrian yang akan dioperasikan
q, err := ch.QueueDeclare(
"", // Nama antrian, jika kosong, nama acak akan dibuat
false, // Tahan lama
false, // Hapus saat tidak digunakan
true, // Eksklusif
false, // Tidak ada penundaan
nil, // Argumen
)
failOnError(err, "Gagal mendeklarasikan antrian")
// Mengikat antrian ke pertukaran yang ditentukan
err = ch.QueueBind(
q.Name, // Nama antrian
"", // Kunci routing, diabaikan untuk pertukaran fanout
"tizi365", // Nama Exchange, harus sesuai dengan yang ditentukan oleh pengirim pesan
false,
nil)
failOnError(err, "Gagal mengikat antrian")
// Membuat konsumen
msgs, err := ch.Consume(
q.Name, // Merujuk nama antrian sebelumnya
"", // Nama konsumen, akan dibuat secara otomatis jika kosong
true, // Auto-ack
false, // Eksklusif
false, // Tidak lokal
false, // Tidak ada penundaan
nil, // Argumen
)
failOnError(err, "Gagal mendaftarkan konsumen")
// Mengonsumsi pesan dari antrian dalam pengulangan
for d := range msgs {
log.Printf("Menerima pesan: %s", d.Body)
}
}
3.5. Konsumen Berganda
Lihat bagian Pola Kerja dan cukup mulai beberapa konsumen menggunakan goroutine.