Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)
The publish/subscribe pattern in RabbitMQ means that a message sent by a producer will be processed by multiple consumers.
Explanation:
- P represents the producer, C1 and C2 represent consumers, red represents queues, and X represents the exchange.
- The exchange is responsible for forwarding messages to all queues bound to the exchange.
- Multiple queues can be defined, each bound to the same exchange.
- Each queue can have one or more consumers.
Note: If you are not familiar with RabbitMQ, please read the RabbitMQ Basic Concepts section first.
1. Install Dependency Package
go get github.com/streadway/amqp
2. Send Messages
The following steps demonstrate how the message producer sends messages.
2.1. Connect to RabbitMQ Server
// Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Connection address explanation:
amqp://username:password@RabbitMQAddress:Port/
2.2. Create a Channel
Most operations are completed on the channel.
ch, err := conn.Channel()
defer ch.Close()
2.3. Declare an Exchange
Messages are first sent to the exchange. The exchange forwards messages to queues based on its strategy.
err = ch.ExchangeDeclare(
"tizi365", // Exchange name
"fanout", // Exchange type, using fanout type here, i.e., publish/subscribe pattern
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Arguments
)
2.4. Publish a Message
// Message content
body := "Hello Tizi365.com!"
// Publish the message
err = ch.Publish(
"tizi365", // Exchange (exchange name corresponding to the previous declaration)
"", // Routing key, for fanout type exchange, the routing key is automatically ignored, so it is not necessary to provide one
false, // Mandatory
false, // Immediate
amqp.Publishing {
ContentType: "text/plain", // Message content type, here it is plain text
Body: []byte(body), // Message content
})
2.5. Complete Message Push Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connect to rabbitmq
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// Declare an exchange
err = ch.ExchangeDeclare(
"tizi365", // Exchange name
"fanout", // Exchange type, fanout for publish/subscribe mode
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Arguments
)
failOnError(err, "Failed to declare an exchange")
// Message content
body := "Hello Tizi365.com!"
// Push message
err = ch.Publish(
"tizi365", // Exchange (matching the declaration above)
"", // Routing key, for fanout type exchanges, the routing key is automatically ignored
false, // Mandatory
false, // Immediate
amqp.Publishing {
ContentType: "text/plain", // Message content type, here is plain text
Body: []byte(body), // Message content
})
log.Printf("Sent content %s", body)
}
3. Receive Messages
The first three steps for receiving messages—connecting to RabbitMQ, creating a channel, and declaring an exchange—are the same as sending messages. Refer to the preceding sections 2.1, 2.2, and 2.3.
3.1. Declare a Queue
Declare the queue to be operated on
q, err := ch.QueueDeclare(
"", // Queue name, if not specified, a random one will be generated
false, // Durable
false, // Delete when unused
true, // Exclusive
false, // No-wait
nil, // Arguments
)
3.2. Bind the Queue to the Exchange
The queue needs to be bound to the exchange to receive messages
err = ch.QueueBind(
q.Name, // Queue name
"", // Routing key, for fanout type exchanges, the routing key is automatically ignored
"tizi365", // Exchange name, must match the one defined by the message sender
false,
nil)
Note: In actual applications, we can define N queues, each bound to the same exchange, in order to receive messages forwarded by the exchange. This is where the publish/subscribe pattern is reflected.
3.3. Create a Consumer
msgs, err := ch.Consume(
q.Name, // Referencing the queue name from above
"", // Consumer name, if not specified, it will be generated automatically
true, // Automatically acknowledge that the message has been processed
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Args
)
// Loop to handle messages
for d := range msgs {
log.Printf("Received message=%s", d.Body)
}
3.4. Complete Consumer Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connect to RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Create a channel, usually one per consumer
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// Declare an exchange
err = ch.ExchangeDeclare(
"tizi365", // Exchange name, should match the one used by the message sender
"fanout", // Exchange type
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Arguments
)
failOnError(err, "Failed to declare an exchange")
// Declare the queue to operate on
q, err := ch.QueueDeclare(
"", // Queue name, if empty a random name will be generated
false, // Durable
false, // Delete when unused
true, // Exclusive
false, // No-wait
nil, // Arguments
)
failOnError(err, "Failed to declare a queue")
// Bind the queue to the specified exchange
err = ch.QueueBind(
q.Name, // Queue name
"", // Routing key, ignored for fanout exchanges
"tizi365", // Exchange name, should match the one defined by the message sender
false,
nil)
failOnError(err, "Failed to bind a queue")
// Create a consumer
msgs, err := ch.Consume(
q.Name, // Reference the earlier queue name
"", // Consumer name, will be automatically generated if empty
true, // Auto-ack
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Args
)
failOnError(err, "Failed to register a consumer")
// Consume messages from the queue in a loop
for d := range msgs {
log.Printf("Received message: %s", d.Body)
}
}
3.5. Multiple Consumers
Refer to the Work mode section and simply start multiple consumers using goroutines.