Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

The publish/subscribe pattern in RabbitMQ means that a message sent by a producer will be processed by multiple consumers.

Fanout Mode

Explanation:

  • P represents the producer, C1 and C2 represent consumers, red represents queues, and X represents the exchange.
  • The exchange is responsible for forwarding messages to all queues bound to the exchange.
  • Multiple queues can be defined, each bound to the same exchange.
  • Each queue can have one or more consumers.

Note: If you are not familiar with RabbitMQ, please read the RabbitMQ Basic Concepts section first.

1. Install Dependency Package

go get github.com/streadway/amqp

2. Send Messages

The following steps demonstrate how the message producer sends messages.

2.1. Connect to RabbitMQ Server

// Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Connection address explanation:

amqp://username:password@RabbitMQAddress:Port/

2.2. Create a Channel

Most operations are completed on the channel.

ch, err := conn.Channel()
defer ch.Close()

2.3. Declare an Exchange

Messages are first sent to the exchange. The exchange forwards messages to queues based on its strategy.

err = ch.ExchangeDeclare(
    "tizi365",   // Exchange name
    "fanout", // Exchange type, using fanout type here, i.e., publish/subscribe pattern
    true,     // Durable
    false,    // Auto-deleted
    false,    // Internal
    false,    // No-wait
    nil,      // Arguments
)

2.4. Publish a Message

// Message content
body := "Hello Tizi365.com!"

// Publish the message
err = ch.Publish(
  "tizi365",     // Exchange (exchange name corresponding to the previous declaration)
  "", // Routing key, for fanout type exchange, the routing key is automatically ignored, so it is not necessary to provide one
  false,  // Mandatory
  false,  // Immediate
  amqp.Publishing {
    ContentType: "text/plain", // Message content type, here it is plain text
    Body:        []byte(body),  // Message content
  })

2.5. Complete Message Push Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // Connect to rabbitmq
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // Create a channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // Declare an exchange
    err = ch.ExchangeDeclare(
        "tizi365",   // Exchange name
        "fanout", // Exchange type, fanout for publish/subscribe mode
        true,     // Durable
        false,    // Auto-deleted
        false,    // Internal
        false,    // No-wait
        nil,      // Arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // Message content
    body := "Hello Tizi365.com!"
    // Push message
    err = ch.Publish(
        "tizi365",     // Exchange (matching the declaration above)
        "", // Routing key, for fanout type exchanges, the routing key is automatically ignored
        false,  // Mandatory
        false,  // Immediate
        amqp.Publishing {
            ContentType: "text/plain", // Message content type, here is plain text
            Body:        []byte(body),  // Message content
        })

    log.Printf("Sent content %s", body)
}

3. Receive Messages

The first three steps for receiving messages—connecting to RabbitMQ, creating a channel, and declaring an exchange—are the same as sending messages. Refer to the preceding sections 2.1, 2.2, and 2.3.

3.1. Declare a Queue

Declare the queue to be operated on

q, err := ch.QueueDeclare(
        "",    // Queue name, if not specified, a random one will be generated
        false, // Durable
        false, // Delete when unused
        true,  // Exclusive
        false, // No-wait
        nil,   // Arguments
    )

3.2. Bind the Queue to the Exchange

The queue needs to be bound to the exchange to receive messages

err = ch.QueueBind(
        q.Name, // Queue name
        "",     // Routing key, for fanout type exchanges, the routing key is automatically ignored
        "tizi365", // Exchange name, must match the one defined by the message sender
        false,
        nil)

Note: In actual applications, we can define N queues, each bound to the same exchange, in order to receive messages forwarded by the exchange. This is where the publish/subscribe pattern is reflected.

3.3. Create a Consumer

msgs, err := ch.Consume(
        q.Name, // Referencing the queue name from above
        "",     // Consumer name, if not specified, it will be generated automatically
        true,   // Automatically acknowledge that the message has been processed
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Args
    )

// Loop to handle messages
for d := range msgs {
    log.Printf("Received message=%s", d.Body)
}

3.4. Complete Consumer Code

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // Connect to RabbitMQ
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // Create a channel, usually one per consumer
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // Declare an exchange
    err = ch.ExchangeDeclare(
        "tizi365",   // Exchange name, should match the one used by the message sender
        "fanout", // Exchange type
        true,     // Durable
        false,    // Auto-deleted
        false,    // Internal
        false,    // No-wait
        nil,      // Arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // Declare the queue to operate on
    q, err := ch.QueueDeclare(
        "",    // Queue name, if empty a random name will be generated
        false, // Durable
        false, // Delete when unused
        true,  // Exclusive
        false, // No-wait
        nil,   // Arguments
    )
    failOnError(err, "Failed to declare a queue")

    // Bind the queue to the specified exchange
    err = ch.QueueBind(
        q.Name, // Queue name
        "",     // Routing key, ignored for fanout exchanges
        "tizi365", // Exchange name, should match the one defined by the message sender
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    // Create a consumer
    msgs, err := ch.Consume(
        q.Name, // Reference the earlier queue name
        "",     // Consumer name, will be automatically generated if empty
        true,   // Auto-ack
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Args
    )
    failOnError(err, "Failed to register a consumer")

    // Consume messages from the queue in a loop
    for d := range msgs {
        log.Printf("Received message: %s", d.Body)
    }
}

3.5. Multiple Consumers

Refer to the Work mode section and simply start multiple consumers using goroutines.