Importância de Publicar Mensagens em uma Transação
Ao lidar com aplicações orientadas a eventos, pode haver momentos em que você precisa persistir o estado da aplicação e publicar mensagens para informar outras partes do sistema sobre o que acabou de acontecer. Em um cenário ideal, você gostaria de persistir o estado da aplicação e publicar uma mensagem dentro de uma única transação, pois falhar em fazê-lo pode levar a problemas de consistência de dados. Para confirmar simultaneamente o armazenamento de dados e publicar eventos dentro de uma transação, você deve ser capaz de publicar mensagens no mesmo banco de dados usado para o armazenamento de dados, ou implementar um mecanismo de confirmação em duas fases (2PC). Se você não quer alterar o corretor de mensagens para o banco de dados e não quer reinventar a roda, você pode simplificar seu trabalho usando o componente Forwarder do Watermill!
Componente Forwarder
Você pode pensar no Forwarder como um daemon em segundo plano que espera mensagens a serem publicadas no banco de dados e garante que elas cheguem eventualmente ao corretor de mensagens.
Para tornar o Forwarder genérico e transparente de usar, ele escuta um tópico dedicado no banco de dados intermediário e usa um publicador Forwarder decorado para enviar as mensagens encapsuladas. O Forwarder as desempacota e as envia para o tópico de destino especificado no corretor de mensagens.
Exemplo
Vamos considerar o seguinte exemplo: há um comando responsável por realizar um sorteio de loteria. Ele deve selecionar aleatoriamente um usuário registrado no sistema como o vencedor. Durante isso, ele também deve persistir sua decisão armazenando uma entrada no banco de dados que associa o ID exclusivo da loteria ao ID do usuário selecionado. Além disso, como um sistema orientado a eventos, ele também deve publicar um evento LoteriaConcluída
, para que outros componentes possam reagir adequadamente. Para ser preciso - haverá um componente responsável por enviar prêmios ao vencedor da loteria. Ele receberá o evento LoteriaConcluída
e verificará com a entrada no banco de dados usando o ID da loteria incorporado para determinar o vencedor.
Em nosso cenário, o banco de dados é o MySQL e o corretor de mensagens é o Google Pub/Sub, mas poderia ser qualquer outra combinação de duas tecnologias.
Ao implementar um comando como esse, várias abordagens podem ser adotadas. A seguir, apresentaremos três tentativas possíveis e destacaremos suas deficiências.
Publicar Evento Primeiro, Depois Armazenar Dados
Nesta abordagem, o comando primeiro publica um evento e depois armazena os dados. Embora este método possa funcionar corretamente na maioria dos casos, vamos tentar identificar possíveis problemas.
O comando precisa executar três operações básicas:
- Selecionar um usuário aleatório
A
como o vencedor. - Publicar um evento
LotteryConcluded
para informar que a loteriaB
terminou. - Armazenar no banco de dados que a loteria
B
foi ganha pelo usuárioA
.
Cada etapa está sujeita a falhas, o que pode interromper o fluxo do nosso comando. Se a primeira etapa falhar, as consequências não serão graves - só precisamos retornar um erro e considerar o comando inteiro como falhado. Nenhum dado será armazenado e nenhum evento será publicado. Podemos simplesmente executar o comando novamente.
Se a segunda etapa falhar, ainda não teremos publicado o evento nem armazenado os dados no banco de dados. Podemos executar o comando novamente e tentar outra vez.
O cenário mais interessante é o que acontece se a terceira etapa falhar. Após a segunda etapa, já teremos publicado o evento, mas, no final, nenhum dado será armazenado no banco de dados. Outros componentes receberão a sinalização de que a loteria terminou, mas não haverá vencedor associado ao ID da loteria enviado no evento. Eles não poderão verificar o vencedor, então suas operações também devem ser consideradas como falhas.
Ainda podemos sair dessa situação, mas pode exigir algumas operações manuais, como executar novamente o comando usando o ID da loteria do evento que foi enviado.
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Publicar o evento no Google Cloud Pub/Sub primeiro, depois armazenar os dados no MySQL.
func publicarEventoEEArmazenarDados(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// Se houver um erro, o evento será enviado, mas os dados ainda não foram salvos.
if err = simularErro(); err != nil {
logger.Error("Falha ao persistir os dados", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Armazenar dados primeiro, depois publicar o evento
Na segunda abordagem, tentaremos abordar as deficiências do método de endereçamento primeiro. Para evitar vazar a situação de falha para componentes externos quando o estado não está corretamente persistido no banco de dados e os eventos não são publicados, mudaremos a ordem das ações da seguinte forma:
- Selecionar aleatoriamente o usuário
A
como o vencedor da loteriaB
. - Persistir a informação de que a loteria
B
foi ganha pelo usuárioA
no banco de dados. - Publicar um evento
LotteryConcluded
, informando que a loteriaB
chegou ao fim.
Similar à primeira abordagem, se as duas primeiras ações falharem, não teremos consequências. No caso de falha na terceira ação, nossos dados serão persistidos no banco de dados, mas nenhum evento será publicado. Nesse caso, não vazaremos a situação de falha para componentes fora da loteria. No entanto, considerando o comportamento esperado do sistema, nosso vencedor não será capaz de receber o prêmio porque nenhum evento é enviado ao componente responsável por esta operação.
Este problema também poderia ser resolvido por operação manual, ou seja, publicando manualmente o evento. Mas podemos fazer melhor.
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Persistir os dados no MySQL primeiro, e então publicar diretamente o evento para o Google Cloud Pub/Sub.
func persistirDadosEEnviarEvento(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Se isso falhar, nossos dados já estão persistidos, mas nenhum evento é publicado.
if err = simularErro(); err != nil {
logger.Error("Falha ao publicar o evento", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Armazenar dados e Publicar Eventos em uma Transação
Imagine que nosso comando possa executar a segunda e terceira tarefas ao mesmo tempo. Elas serão comprometidas atomicamente, o que significa que se uma tarefa falhar, a outra não poderá ter sucesso. Isso pode ser alcançado aproveitando o mecanismo de transação já implementado na maioria dos bancos de dados. O MySQL usado em nosso exemplo é um deles.
Para armazenar dados e publicar eventos em uma única transação, precisamos ser capazes de publicar mensagens no MySQL. Como não queremos mudar o corretor de mensagens para ser suportado pelo MySQL em todo o sistema, devemos encontrar outras maneiras de alcançar isso.
A boa notícia é: o Watermill fornece todas as ferramentas necessárias! Se você estiver usando um banco de dados como MySQL, PostgreSQL (ou qualquer outro banco de dados SQL), Firestore ou Bolt, poderá publicar mensagens neles. O componente Forwarder ajudará você a selecionar todas as mensagens que você publica no banco de dados e encaminhá-las para o seu corretor de mensagens.
Você precisa garantir que:
- Seu comando use um publicador que funcione no contexto de uma transação de banco de dados (por exemplo, SQL, Firestore, Bolt).
- O componente Forwarder está em execução, usando um assinante de banco de dados e um publicador de corretor de mensagens.
Neste caso, o comando pode parecer com isto:
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Para habilitar o componente Forwarder para funcionar em segundo plano e encaminhar mensagens do MySQL para o Google Pub/Sub, você precisa configurá-lo da seguinte forma:
Para ver o código completo, consulte github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Persistir dados no MySQL e publicar eventos no Google Cloud Pub/Sub em uma transação.
func persistirDadosEpublicarEventoEmTransacao(idLoteria int, usuarioSelecionado string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Revertendo a transação devido a um erro", watermill.LogFields{"erro": err.Error()})
// Em caso de erro, devido ao rollback da transação do MySQL, podemos garantir que os seguintes cenários indesejados não ocorram:
// - Evento publicado mas dados não persistidos,
// - Dados persistidos mas evento não publicado.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO loterias (id_loteria, vencedor) VALUES(?, ?)`, idLoteria, usuarioSelecionado)
if err != nil {
return err
}
var publicador message.Publisher
publicador, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Decorar o publicador com um envelope compreendido pelo componente forwarder.
publicador = forwarder.NewPublisher(publicador, forwarder.PublisherConfig{
TopicoForwarder: topicoForwarderSQL,
})
// Publicar um anúncio do evento de conclusão da loteria. Observe que publicamos um tópico do Google Cloud aqui usando o publicador MySQL decorado.
evento := EventoConclusaoLoteria{IdLoteria: idLoteria}
payload, err := json.Marshal(evento)
if err != nil {
return err
}
err = publicador.Publish(topicoEventoGoogleCloud, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Se quiser aprender mais sobre este exemplo, por favor encontre a sua implementação [aqui](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).