Golang Padrão de Publicação/Assinatura RabbitMQ (Modo de Transmissão, Modo de Fanout)
O padrão de publicação/assinatura no RabbitMQ significa que uma mensagem enviada por um produtor será processada por vários consumidores.
Explicação:
- P representa o produtor, C1 e C2 representam os consumidores, vermelho representa filas, e X representa a exchange.
- A exchange é responsável por encaminhar mensagens para todas as filas vinculadas à exchange.
- Múltiplas filas podem ser definidas, cada uma vinculada à mesma exchange.
- Cada fila pode ter um ou mais consumidores.
Observação: Se você não está familiarizado com o RabbitMQ, por favor leia a seção Conceitos Básicos do RabbitMQ primeiro.
1. Instalar Pacote de Dependência
go get github.com/streadway/amqp
2. Enviar Mensagens
Os seguintes passos demonstram como o produtor de mensagens envia mensagens.
2.1. Conectar ao Servidor RabbitMQ
// Conectar ao Servidor RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Explicação do endereço de conexão:
amqp://nomeusuario:senha@EnderecoRabbitMQ:Porta/
2.2. Criar um Canal
A maioria das operações é realizada no canal.
ch, err := conn.Channel()
defer ch.Close()
2.3. Declarar uma Exchange
As mensagens são enviadas primeiro para a exchange. A exchange encaminha as mensagens para as filas com base em sua estratégia.
err = ch.ExchangeDeclare(
"tizi365", // Nome da exchange
"fanout", // Tipo de exchange, usando o tipo fanout aqui, ou seja, padrão de publicação/assinatura
true, // Durável
false, // Auto-eliminado
false, // Interno
false, // Sem espera
nil, // Argumentos
)
2.4. Publicar uma Mensagem
// Conteúdo da mensagem
body := "Olá Tizi365.com!"
// Publicar a mensagem
err = ch.Publish(
"tizi365", // Exchange (nome da exchange correspondente à declaração anterior)
"", // Chave de roteamento, para exchanges do tipo fanout, a chave de roteamento é automaticamente ignorada, então não é necessário fornecê-la
false, // Obrigatório
false, // Imediato
amqp.Publishing {
ContentType: "text/plain", // Tipo de conteúdo da mensagem, aqui é texto simples
Body: []byte(body), // Conteúdo da mensagem
})
2.5. Concluir Código de Envio de Mensagem
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Conectar ao RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Falha ao se conectar ao RabbitMQ")
defer conn.Close()
// Criar um canal
ch, err := conn.Channel()
failOnError(err, "Falha ao abrir um canal")
defer ch.Close()
// Declarar uma exchange
err = ch.ExchangeDeclare(
"tizi365", // Nome da exchange
"fanout", // Tipo da exchange, fanout para o modo de publicação/assinatura
true, // Durável
false, // Auto-eliminado
false, // Interno
false, // Sem espera
nil, // Argumentos
)
failOnError(err, "Falha ao declarar uma exchange")
// Conteúdo da mensagem
body := "Olá Tizi365.com!"
// Enviar mensagem
err = ch.Publish(
"tizi365", // Exchange (correspondendo à declaração acima)
"", // Chave de roteamento, para exchanges do tipo fanout, a chave de roteamento é automaticamente ignorada
false, // Obrigatório
false, // Imediato
amqp.Publishing {
ContentType: "text/plain", // Tipo de conteúdo da mensagem, aqui é texto simples
Body: []byte(body), // Conteúdo da mensagem
})
log.Printf("Enviado o conteúdo %s", body)
}
3. Receber Mensagens
Os primeiros três passos para receber mensagens - conectar ao RabbitMQ, criar um canal e declarar uma exchange - são os mesmos que enviar mensagens. Consulte as seções anteriores 2.1, 2.2 e 2.3.
3.1. Declarar uma Fila
Declare a fila na qual será realizada a operação
q, err := ch.QueueDeclare(
"", // Nome da fila, se não especificado, um será gerado aleatoriamente
false, // Durável
false, // Excluir quando não utilizada
true, // Exclusiva
false, // Sem espera
nil, // Argumentos
)
3.2. Vincular a Fila à Troca
A fila precisa ser vinculada à troca para receber mensagens
err = ch.QueueBind(
q.Name, // Nome da fila
"", // Chave de roteamento, para trocas do tipo fanout, a chave de roteamento é automaticamente ignorada
"tizi365", // Nome da troca, deve corresponder àquela definida pelo remetente da mensagem
false,
nil)
Observação: Em aplicações reais, podemos definir N filas, cada uma vinculada à mesma troca, a fim de receber mensagens encaminhadas pela troca. Aqui é onde o padrão publicar/se inscrever é refletido.
3.3. Criar um Consumidor
msgs, err := ch.Consume(
q.Name, // Referenciando o nome da fila acima
"", // Nome do consumidor, se não especificado, será gerado automaticamente
true, // Reconhecer automaticamente que a mensagem foi processada
false, // Exclusiva
false, // Não local
false, // Sem espera
nil, // Args
)
// Loop para manipular as mensagens
for d := range msgs {
log.Printf("Mensagem recebida=%s", d.Body)
}
3.4. Código do Consumidor Completo
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Conectar ao RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Falha ao conectar ao RabbitMQ")
defer conn.Close()
// Criar um canal, geralmente um por consumidor
ch, err := conn.Channel()
failOnError(err, "Falha ao abrir um canal")
defer ch.Close()
// Declarar uma troca
err = ch.ExchangeDeclare(
"tizi365", // Nome da troca, deve corresponder àquela usada pelo remetente da mensagem
"fanout", // Tipo de troca
true, // Durável
false, // Autoexcluível
false, // Interna
false, // Sem espera
nil, // Argumentos
)
failOnError(err, "Falha ao declarar uma troca")
// Declarar a fila na qual será realizada a operação
q, err := ch.QueueDeclare(
"", // Nome da fila, se vazio, um nome aleatório será gerado
false, // Durável
false, // Excluir quando não utilizada
true, // Exclusiva
false, // Sem espera
nil, // Argumentos
)
failOnError(err, "Falha ao declarar uma fila")
// Vincular a fila à troca especificada
err = ch.QueueBind(
q.Name, // Nome da fila
"", // Chave de roteamento, ignorada para trocas do tipo fanout
"tizi365", // Nome da troca, deve corresponder àquela definida pelo remetente da mensagem
false,
nil)
failOnError(err, "Falha ao vincular uma fila")
// Criar um consumidor
msgs, err := ch.Consume(
q.Name, // Referência ao nome da fila anterior
"", // Nome do consumidor, será gerado automaticamente se estiver vazio
true, // Auto-reconhecer
false, // Exclusiva
false, // Não local
false, // Sem espera
nil, // Args
)
failOnError(err, "Falha ao registrar um consumidor")
// Consumir mensagens da fila em um loop
for d := range msgs {
log.Printf("Mensagem recebida: %s", d.Body)
}
}
3.5. Múltiplos Consumidores
Consulte a seção Modo de trabalho e simplesmente inicie vários consumidores usando goroutines.