Importancia de Publicar Mensajes en una Transacción
Cuando se trata de aplicaciones basadas en eventos, puede haber momentos en los que necesite persistir el estado de la aplicación y publicar mensajes para informar a otras partes del sistema sobre lo que acaba de suceder. En un escenario ideal, desearía persistir el estado de la aplicación y publicar un mensaje dentro de una sola transacción, ya que no hacerlo puede generar problemas de consistencia de datos. Para confirmar simultáneamente el almacenamiento de datos y publicar eventos dentro de una transacción, debe poder publicar mensajes en la misma base de datos utilizada para el almacenamiento de datos, o implementar un mecanismo de confirmación en dos fases (2PC). Si no desea cambiar el servidor de mensajes a la base de datos y no quiere reinventar la rueda, puede simplificar su trabajo utilizando el componente Forwarder de Watermill.
Componente Forwarder
Puede pensar en el Forwarder como un demonio en segundo plano que espera a que se publiquen mensajes en la base de datos y se asegura de que eventualmente lleguen al servidor de mensajes.
Para hacer que el Forwarder sea genérico y transparente de usar, escucha en un tema dedicado en la base de datos intermedia y utiliza un publicador Forwarder decorado para enviar los mensajes encapsulados. El Forwarder los desempaqueta y los envía al tema especificado en el servidor de mensajes.
Ejemplo
Consideremos el siguiente ejemplo: hay un comando responsable de llevar a cabo un sorteo de lotería. Debe seleccionar aleatoriamente a un usuario registrado en el sistema como el ganador. Mientras hace esto, también debe persistir su decisión almacenando una entrada en la base de datos que vincule el ID único de la lotería con el ID del usuario seleccionado. Además, como sistema basado en eventos, también debe publicar un evento LoteríaConcluida
, para que otros componentes puedan reaccionar adecuadamente. Para ser precisos, habrá un componente responsable de enviar premios al ganador de la lotería. Recibirá el evento LoteríaConcluida
y verificará con la entrada de la base de datos que utiliza el ID de la lotería incrustado para determinar al ganador.
En nuestro escenario, la base de datos es MySQL y el servidor de mensajes es Google Pub/Sub, pero podrían ser otras dos tecnologías.
Al implementar un comando como este, se pueden tomar varios enfoques. A continuación, presentaremos tres intentos posibles y señalaremos sus deficiencias.
Publicar evento primero, luego almacenar datos
En este enfoque, el comando primero publica un evento y luego almacena los datos. Aunque este método puede funcionar correctamente en la mayoría de los casos, tratemos de identificar posibles problemas.
El comando debe ejecutar tres operaciones básicas:
- Seleccionar un usuario aleatorio
A
como ganador. - Publicar un evento
LotteryConcluded
para informar que la loteríaB
ha finalizado. - Almacenar en la base de datos que la lotería
B
ha sido ganada por el usuarioA
.
Cada paso es propenso a fallas, lo que puede interrumpir nuestro flujo de comandos. Si el primer paso falla, las consecuencias no serán graves; simplemente necesitamos devolver un error y considerar que todo el comando ha fallado. No se almacenarán datos y no se publicará ningún evento. Simplemente podemos volver a ejecutar el comando.
Si el segundo paso falla, todavía no hemos publicado el evento ni almacenado los datos en la base de datos. Podemos volver a ejecutar el comando e intentarlo de nuevo.
El escenario más interesante es lo que sucede si el tercer paso falla. Después del segundo paso, ya hemos publicado el evento, pero en última instancia, no se almacenarán datos en la base de datos. Otros componentes recibirán la señal de que la lotería ha finalizado, pero no habrá un ganador asociado con la ID de lotería enviada en el evento. No podrán verificar al ganador, por lo que sus operaciones también deben considerarse como fallidas.
Todavía podemos salir de esta situación, pero puede requerir algunas operaciones manuales, como volver a ejecutar el comando utilizando la ID de lotería del evento que ya se ha enviado.
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Publicar el evento en Google Cloud Pub/Sub primero, luego almacenar los datos en MySQL.
func publicarEventoYPersistirDatos(IDLoteria int, usuarioSeleccionado string, registrador watermill.LoggerAdapter) error {
publicador, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
registrador,
)
if err != nil {
return err
}
evento := LotteryConcludedEvent{LotteryID: IDLoteria}
payload, err := json.Marshal(evento)
if err != nil {
return err
}
err = publicador.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// Si hay un error, el evento se ha enviado pero los datos aún no se han guardado.
if err = simularError(); err != nil {
registrador.Error("Error al persistir datos", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, IDLoteria, usuarioSeleccionado)
if err != nil {
return err
}
// ...
Almacenar datos primero, luego publicar el evento
En el segundo enfoque, intentaremos abordar las deficiencias del método de dirección primero. Para evitar que la situación de fallo se filtre a componentes externos cuando el estado no se guarda correctamente en la base de datos y los eventos no se publican, cambiaremos el orden de las acciones de la siguiente manera:
- Seleccionar aleatoriamente al usuario
A
como el ganador de la loteríaB
. - Persistir la información de que la lotería
B
ha sido ganada por el usuarioA
en la base de datos. - Publicar un evento
LotteryConcluded
, informando que la loteríaB
ha terminado.
Similar al primer enfoque, si las dos primeras acciones fallan, no tendremos consecuencias. En el caso de la falla de la tercera acción, nuestros datos se persistirán en la base de datos, pero no se publicará ningún evento. En este caso, no filtraremos la situación de falla a los componentes fuera de la lotería. Sin embargo, considerando el comportamiento esperado del sistema, nuestro ganador no podrá recibir el premio porque ningún evento se pasará al componente responsable de esta operación.
Este problema también se podría resolver mediante una operación manual, es decir, publicando manualmente el evento. Pero podemos hacerlo mejor.
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Persistir los datos en MySQL primero, luego publicar directamente el evento en Google Cloud Pub/Sub.
func persistirDatosYPublicarEvento(idLoteria int, usuarioElegido string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, idLoteria, usuarioElegido)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
evento := EventoLoteriaConcluida{IDLoteria: idLoteria}
payload, err := json.Marshal(evento)
if err != nil {
return err
}
// Si esto falla, nuestros datos ya están persistidos, pero no se publica ningún evento.
if err = simularError(); err != nil {
logger.Error("Error al publicar el evento", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Almacenamiento de datos y publicación de eventos en una transacción
Imagina que nuestro comando puede realizar la segunda y tercera tareas al mismo tiempo. Se comprometerán atómicamente, lo que significa que si una tarea falla, la otra no puede tener éxito. Esto se puede lograr aprovechando el mecanismo de transacción ya implementado en la mayoría de las bases de datos. El MySQL utilizado en nuestro ejemplo es uno de ellos.
Para almacenar datos y publicar eventos en una sola transacción, necesitamos poder publicar mensajes en MySQL. Dado que no queremos cambiar el message broker para que sea compatible con MySQL en todo el sistema, debemos encontrar otras formas de lograr esto.
¡La buena noticia es que Watermill proporciona todas las herramientas necesarias! Si estás utilizando una base de datos como MySQL, PostgreSQL (o cualquier otra base de datos SQL), Firestore o Bolt, puedes publicar mensajes en ellas. El componente Forwarder te ayudará a seleccionar todos los mensajes que publiques en la base de datos y enviarlos a tu message broker.
Debes asegurarte de que:
- Tu comando utilice un publicador que funcione en el contexto de una transacción de base de datos (por ejemplo, SQL, Firestore, Bolt).
- El componente Forwarder esté en ejecución, utilizando un suscriptor de base de datos y un publicador de message broker.
En este caso, el comando puede lucir así:
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Para habilitar que el componente Forwarder funcione en segundo plano y reenvíe mensajes desde MySQL a Google Pub/Sub, necesitas configurarlo de la siguiente manera:
Para ver el código completo, consulta github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Persistir datos en MySQL y publicar eventos en Google Cloud Pub/Sub en una transacción.
func persistirDatosYPublicarEventoEnTransaccion(idLoteria int, usuarioSeleccionado string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Revirtiendo la transacción debido a un error", watermill.LogFields{"error": err.Error()})
// En caso de error, debido a la reversión de la transacción de MySQL, podemos asegurar que no ocurran los siguientes escenarios no deseados:
// - Evento publicado pero datos no persistidos,
// - Datos persistidos pero evento no publicado.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO loterias (id_loteria, ganador) VALUES(?, ?)`, idLoteria, usuarioSeleccionado)
if err != nil {
return err
}
var publicador message.Publisher
publicador, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Decorar el publicador con un sobre entendido por el componente forwarder.
publicador = forwarder.NewPublisher(publicador, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Publicar un anuncio del evento de conclusión de la lotería. Ten en cuenta que publicamos a un tema de Google Cloud aquí utilizando el publicador MySQL decorado.
evento := EventoConcluidoLoteria{IDLoteria: idLoteria}
payload, err := json.Marshal(evento)
if err != nil {
return err
}
err = publicador.Publish(topicEventoGoogleCloud, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Si desea obtener más información sobre este ejemplo, por favor encuentre su implementación [aquí](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).