Mode Antrian Sederhana Golang RabbitMQ
Penjelasan: P mewakili produsen, C mewakili konsumen, dan merah mewakili antrian.
Catatan: Jika Anda tidak familiar dengan RabbitMQ, silakan baca bagian Konsep Dasar RabbitMQ terlebih dahulu.
1. Pasang Dependensi
go get github.com/streadway/amqp
Impor paket dependensi
import (
"github.com/streadway/amqp"
)
2. Membuat Pesan
Langkah-langkah berikut menunjukkan bagaimana produsen pesan menyelesaikan dorongan pesan.
2.1. Terhubung ke Server RabbitMQ
// Terhubung ke Server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Penjelasan alamat koneksi:
amqp://username:katasandi@AlamatRabbitMQ:port/
2.2. Membuat Saluran (Channel)
Sebagian besar operasi dilakukan pada saluran.
ch, err := conn.Channel()
defer ch.Close()
2.3. Mendeklarasikan Antrian
Mewakili antrian yang perlu kami baca atau tulis dari.
q, err := ch.QueueDeclare(
"hello", // Nama antrian
false, // Ketahanan pesan
false, // Hapus antrian saat tidak digunakan
false, // Eksklusif
false, // Tidak menunggu
nil, // Argumen
)
2.4. Mendorong Pesan
// Isi pesan
body := "Halo Dunia!"
// Mendorong pesan
err = ch.Publish(
"", // Pertukaran (abaikan di sini)
q.Name, // Parameter routing, gunakan nama antrian sebagai parameter routing
false, // Wajib
false, // Segera
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Isi pesan
})
2.5. Kode Lengkap untuk Mengirim Pesan
package main
// Impor paket
import (
"log"
"github.com/streadway/amqp"
)
// Tangani kesalahan
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Terhubung ke RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Gagal terhubung ke RabbitMQ")
defer conn.Close()
// Buat saluran
ch, err := conn.Channel()
failOnError(err, "Gagal membuka saluran")
defer ch.Close()
// Mendeklarasikan antrian untuk dioperasikan
q, err := ch.QueueDeclare(
"hello", // Nama
false, // Tahan lama
false, // Hapus saat tidak digunakan
false, // Eksklusif
false, // Tidak menunggu
nil, // Argumen
)
failOnError(err, "Gagal mendeklarasikan antrian")
// Isi pesan yang akan dikirim
body := "Halo Dunia!"
// Kirim pesan
err = ch.Publish(
"", // Pertukaran
q.Name, // Kunci routing
false, // Wajib
false, // Segera
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Gagal menerbitkan pesan")
log.Printf(" [x] Mengirim %s", body)
}
3. Menerima Pesan
Langkah pertama tiga dalam menerima pesan sama seperti mengirim pesan, sesuai dengan bagian 2.1, 2.2, dan 2.3 secara berurutan. Kode lengkap untuk menerima pesan adalah sebagai berikut:
package main
// Mengimpor paket-paket
import (
"log"
"github.com/streadway/amqp"
)
// Penanganan kesalahan
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Terhubung ke RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Gagal terhubung ke RabbitMQ")
defer conn.Close()
// Membuat sebuah saluran (channel)
ch, err := conn.Channel()
failOnError(err, "Gagal membuka saluran (channel)")
defer ch.Close()
// Mendeklarasikan antrian yang akan dioperasikan
q, err := ch.QueueDeclare(
"hello", // Nama antrian perlu konsisten dengan nama antrian untuk mengirim pesan
false, // persisten
false, // hapus jika tidak terpakai
false, // eksklusif
false, // no-wait
nil, // argumen
)
failOnError(err, "Gagal mendeklarasikan antrian")
// Membuat konsumen pesan
msgs, err := ch.Consume(
q.Name, // Nama antrian
"", // Nama konsumen, jika tidak diisi, ID unik akan dibuat secara otomatis
true, // Apakah secara otomatis mengakui pesan, yaitu, secara otomatis memberitahu RabbitMQ bahwa pesan telah berhasil diproses
false, // eksklusif
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Gagal mendaftarkan konsumen")
// Mengambil pesan dari antrian dalam sebuah loop
for d := range msgs {
// Mencetak isi pesan
log.Printf("Menerima pesan: %s", d.Body)
}
}