Golang Padrão de Publicação/Assinatura RabbitMQ (Modo de Transmissão, Modo de Fanout)

O padrão de publicação/assinatura no RabbitMQ significa que uma mensagem enviada por um produtor será processada por vários consumidores.

Modo de Fanout

Explicação:

  • P representa o produtor, C1 e C2 representam os consumidores, vermelho representa filas, e X representa a exchange.
  • A exchange é responsável por encaminhar mensagens para todas as filas vinculadas à exchange.
  • Múltiplas filas podem ser definidas, cada uma vinculada à mesma exchange.
  • Cada fila pode ter um ou mais consumidores.

Observação: Se você não está familiarizado com o RabbitMQ, por favor leia a seção Conceitos Básicos do RabbitMQ primeiro.

1. Instalar Pacote de Dependência

go get github.com/streadway/amqp

2. Enviar Mensagens

Os seguintes passos demonstram como o produtor de mensagens envia mensagens.

2.1. Conectar ao Servidor RabbitMQ

// Conectar ao Servidor RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Explicação do endereço de conexão:

amqp://nomeusuario:senha@EnderecoRabbitMQ:Porta/

2.2. Criar um Canal

A maioria das operações é realizada no canal.

ch, err := conn.Channel()
defer ch.Close()

2.3. Declarar uma Exchange

As mensagens são enviadas primeiro para a exchange. A exchange encaminha as mensagens para as filas com base em sua estratégia.

err = ch.ExchangeDeclare(
	"tizi365",   // Nome da exchange
	"fanout", // Tipo de exchange, usando o tipo fanout aqui, ou seja, padrão de publicação/assinatura
	true,     // Durável
	false,    // Auto-eliminado
	false,    // Interno
	false,    // Sem espera
	nil,      // Argumentos
)

2.4. Publicar uma Mensagem

// Conteúdo da mensagem
body := "Olá Tizi365.com!"

// Publicar a mensagem
err = ch.Publish(
  "tizi365",     // Exchange (nome da exchange correspondente à declaração anterior)
  "", // Chave de roteamento, para exchanges do tipo fanout, a chave de roteamento é automaticamente ignorada, então não é necessário fornecê-la
  false,  // Obrigatório
  false,  // Imediato
  amqp.Publishing {
    ContentType: "text/plain", // Tipo de conteúdo da mensagem, aqui é texto simples
    Body:        []byte(body),  // Conteúdo da mensagem
  })

2.5. Concluir Código de Envio de Mensagem

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Conectar ao RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Falha ao se conectar ao RabbitMQ")
	defer conn.Close()

	// Criar um canal
	ch, err := conn.Channel()
	failOnError(err, "Falha ao abrir um canal")
	defer ch.Close()

	// Declarar uma exchange
	err = ch.ExchangeDeclare(
		"tizi365",   // Nome da exchange
		"fanout", // Tipo da exchange, fanout para o modo de publicação/assinatura
		true,     // Durável
		false,    // Auto-eliminado
		false,    // Interno
		false,    // Sem espera
		nil,      // Argumentos
	)
	failOnError(err, "Falha ao declarar uma exchange")

	// Conteúdo da mensagem
	body := "Olá Tizi365.com!"
	// Enviar mensagem
	err = ch.Publish(
		"tizi365",     // Exchange (correspondendo à declaração acima)
		"", // Chave de roteamento, para exchanges do tipo fanout, a chave de roteamento é automaticamente ignorada
		false,  // Obrigatório
		false,  // Imediato
		amqp.Publishing {
			ContentType: "text/plain", // Tipo de conteúdo da mensagem, aqui é texto simples
			Body:        []byte(body),  // Conteúdo da mensagem
		})

	log.Printf("Enviado o conteúdo %s", body)
}

3. Receber Mensagens

Os primeiros três passos para receber mensagens - conectar ao RabbitMQ, criar um canal e declarar uma exchange - são os mesmos que enviar mensagens. Consulte as seções anteriores 2.1, 2.2 e 2.3.

3.1. Declarar uma Fila

Declare a fila na qual será realizada a operação

q, err := ch.QueueDeclare(
		"",    // Nome da fila, se não especificado, um será gerado aleatoriamente
		false, // Durável
		false, // Excluir quando não utilizada
		true,  // Exclusiva
		false, // Sem espera
		nil,   // Argumentos
	)

3.2. Vincular a Fila à Troca

A fila precisa ser vinculada à troca para receber mensagens

err = ch.QueueBind(
		q.Name, // Nome da fila
		"",     // Chave de roteamento, para trocas do tipo fanout, a chave de roteamento é automaticamente ignorada
		"tizi365", // Nome da troca, deve corresponder àquela definida pelo remetente da mensagem
		false,
		nil)

Observação: Em aplicações reais, podemos definir N filas, cada uma vinculada à mesma troca, a fim de receber mensagens encaminhadas pela troca. Aqui é onde o padrão publicar/se inscrever é refletido.

3.3. Criar um Consumidor

msgs, err := ch.Consume(
		q.Name, // Referenciando o nome da fila acima
		"",     // Nome do consumidor, se não especificado, será gerado automaticamente
		true,   // Reconhecer automaticamente que a mensagem foi processada
		false,  // Exclusiva
		false,  // Não local
		false,  // Sem espera
		nil,    // Args
	)
	
// Loop para manipular as mensagens
for d := range msgs {
	log.Printf("Mensagem recebida=%s", d.Body)
}

3.4. Código do Consumidor Completo

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Conectar ao RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Falha ao conectar ao RabbitMQ")
	defer conn.Close()

	// Criar um canal, geralmente um por consumidor
	ch, err := conn.Channel()
	failOnError(err, "Falha ao abrir um canal")
	defer ch.Close()

	// Declarar uma troca
	err = ch.ExchangeDeclare(
		"tizi365",   // Nome da troca, deve corresponder àquela usada pelo remetente da mensagem
		"fanout", // Tipo de troca
		true,     // Durável
		false,    // Autoexcluível
		false,    // Interna
		false,    // Sem espera
		nil,      // Argumentos
	)
	failOnError(err, "Falha ao declarar uma troca")

	// Declarar a fila na qual será realizada a operação
	q, err := ch.QueueDeclare(
		"",    // Nome da fila, se vazio, um nome aleatório será gerado
		false, // Durável
		false, // Excluir quando não utilizada
		true,  // Exclusiva
		false, // Sem espera
		nil,   // Argumentos
	)
	failOnError(err, "Falha ao declarar uma fila")

	// Vincular a fila à troca especificada
	err = ch.QueueBind(
		q.Name, // Nome da fila
		"",     // Chave de roteamento, ignorada para trocas do tipo fanout
		"tizi365", // Nome da troca, deve corresponder àquela definida pelo remetente da mensagem
		false,
		nil)
	failOnError(err, "Falha ao vincular uma fila")

	// Criar um consumidor
	msgs, err := ch.Consume(
		q.Name, // Referência ao nome da fila anterior
		"",     // Nome do consumidor, será gerado automaticamente se estiver vazio
		true,   // Auto-reconhecer
		false,  // Exclusiva
		false,  // Não local
		false,  // Sem espera
		nil,    // Args
	)
	failOnError(err, "Falha ao registrar um consumidor")

	// Consumir mensagens da fila em um loop
	for d := range msgs {
		log.Printf("Mensagem recebida: %s", d.Body)
	}
}

3.5. Múltiplos Consumidores

Consulte a seção Modo de trabalho e simplesmente inicie vários consumidores usando goroutines.