Importance de la publication de messages dans une transaction
Lors de la manipulation d'applications basées sur des événements, il peut arriver que vous ayez besoin de persister l'état de l'application et de publier des messages pour informer d'autres parties du système de ce qui s'est produit. Dans un scénario idéal, vous voudriez persister l'état de l'application et publier un message dans une seule transaction, car ne pas le faire peut entraîner des problèmes de cohérence des données. Pour valider simultanément le stockage des données et publier des événements dans une seule transaction, vous devez être capable de publier des messages dans la même base de données utilisée pour le stockage des données, ou implémenter un mécanisme de validation en deux phases (2PC). Si vous ne souhaitez pas changer le courtier de messages vers la base de données et ne voulez pas réinventer la roue, vous pouvez simplifier votre travail en utilisant le composant Forwarder de Watermill!
Composant Forwarder
Vous pouvez considérer le Forwarder comme un démon en arrière-plan qui attend que des messages soient publiés dans la base de données et assure qu'ils atteignent finalement le courtier de messages.
Pour rendre le Forwarder générique et transparent à l'utilisation, il écoute sur un sujet dédié dans la base de données intermédiaire et utilise un éditeur Forwarder décoré pour envoyer les messages encapsulés. Le Forwarder les déballe et les envoie vers le sujet cible spécifié sur le courtier de messages.
Exemple
Prenons l'exemple suivant: il existe une commande chargée d'organiser un tirage au sort. Elle doit sélectionner de manière aléatoire un utilisateur enregistré dans le système comme gagnant. Pendant ce processus, elle doit également persister sa décision en stockant une entrée de base de données liant l'identifiant unique du tirage au sort à l'identifiant de l'utilisateur sélectionné. De plus, en tant que système basé sur des événements, il doit également publier un événement TirageAuSortTermine
, afin que d'autres composants puissent réagir de manière appropriée. Pour être précis - il y aura un composant chargé d'envoyer des prix au gagnant du tirage au sort. Il recevra l'événement TirageAuSortTermine
et vérifiera avec l'entrée de base de données en utilisant l'identifiant de tirage au sort intégré pour déterminer le gagnant.
Dans notre scénario, la base de données est MySQL et le courtier de messages est Google Pub/Sub, mais cela pourrait être toute autre paire de technologies.
Lors de l'implémentation d'une telle commande, différentes approches peuvent être adoptées. Dans ce qui suit, nous présenterons trois tentatives possibles et soulignerons leurs lacunes.
Publier l'événement d'abord, puis stocker les données
Dans cette approche, la commande publie d'abord un événement, puis stocke les données. Bien que cette méthode puisse fonctionner correctement dans la plupart des cas, essayons d'identifier les problèmes potentiels.
La commande doit exécuter trois opérations de base :
- Sélectionner un utilisateur aléatoire
A
comme gagnant. - Publier un événement
LotteryConcluded
pour informer que la loterieB
est terminée. - Stocker dans la base de données que la loterie
B
a été gagnée par l'utilisateurA
.
Chaque étape est sujette à l'échec, ce qui peut perturber notre flux de commande. Si la première étape échoue, les conséquences ne seront pas graves - nous devrons simplement renvoyer une erreur et considérer la commande entière comme étant en échec. Aucune donnée ne sera stockée, et aucun événement ne sera publié. Nous pouvons simplement relancer la commande.
Si la deuxième étape échoue, nous n'avons toujours pas publié l'événement ou stocké les données dans la base de données. Nous pouvons relancer la commande et réessayer.
Le scénario le plus intéressant est ce qu'il se passe si la troisième étape échoue. Après la deuxième étape, nous avons déjà publié l'événement, mais au final aucune donnée ne sera stockée dans la base de données. D'autres composants recevront le signal que la loterie est terminée, mais il n'y aura aucun gagnant associé à l'identifiant de loterie envoyé dans l'événement. Ils ne pourront pas vérifier le gagnant, donc leurs opérations doivent également être considérées comme ayant échoué.
Nous pouvons encore nous sortir de cette situation, mais cela peut nécessiter quelques opérations manuelles, comme relancer la commande en utilisant l'identifiant de loterie provenant de l'événement qui a déjà été envoyé.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Publier d'abord l'événement sur Google Cloud Pub/Sub, puis stocker les données dans MySQL.
func publierEvenementEtPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// S'il y a une erreur, l'événement a été envoyé mais les données n'ont pas encore été enregistrées.
if err = simulateError(); err != nil {
logger.Error("Échec de la persistance des données", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Stocker d'abord les données, puis publier l'événement
Dans la deuxième approche, nous allons essayer de remédier aux lacunes de la méthode d'abord. Afin d'éviter de divulguer la situation d'échec à des composants externes lorsque l'état n'est pas correctement persisté dans la base de données et que les événements ne sont pas publiés, nous allons changer l'ordre des actions comme suit :
- Sélectionnez de manière aléatoire l'utilisateur
A
comme gagnant de la loterieB
. - Persister l'information selon laquelle la loterie
B
a été remportée par l'utilisateurA
dans la base de données. - Publier un événement
LotteryConcluded
, informant que la loterieB
est terminée.
Similaire à la première approche, si les deux premières actions échouent, nous n'avons pas de conséquences. En cas d'échec de la troisième action, nos données seront persistées dans la base de données, mais aucun événement ne sera publié. Dans ce cas, nous ne divulguerons pas la situation d'échec à des composants extérieurs à la loterie. Cependant, en tenant compte du comportement attendu du système, notre gagnant ne pourra pas recevoir le prix car aucun événement n'a été transmis au composant responsable de cette opération.
Ce problème pourrait également être résolu par une opération manuelle, c'est-à-dire en publiant manuellement l'événement. Mais nous pouvons faire mieux.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Persister d'abord les données dans MySQL, puis publier directement l'événement dans Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Si cela échoue, nos données sont déjà persistées, mais aucun événement n'est publié.
if err = simulateError(); err != nil {
logger.Error("Échec de la publication de l'événement", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Stockage de données et publication d'événements dans une transaction
Imaginons que notre commande puisse effectuer les deuxième et troisième tâches en même temps. Elles seront validées de manière atomique, ce qui signifie que si une tâche échoue, l'autre ne peut pas réussir. Cela peut être réalisé en utilisant le mécanisme de transaction déjà implémenté dans la plupart des bases de données. Le MySQL utilisé dans notre exemple en fait partie.
Afin de stocker des données et de publier des événements dans une seule transaction, nous devons être en mesure de publier des messages dans MySQL. Comme nous ne voulons pas changer le courtier de messages pour qu'il soit pris en charge par MySQL dans l'ensemble du système, nous devons trouver d'autres moyens d'y parvenir.
La bonne nouvelle est que Watermill fournit tous les outils nécessaires ! Si vous utilisez une base de données telle que MySQL, PostgreSQL (ou toute autre base de données SQL), Firestore ou Bolt, vous pouvez publier des messages à leur intention. Le composant Forwarder vous aidera à sélectionner tous les messages que vous publiez dans la base de données et à les transférer vers votre courtier de messages.
Vous devez vous assurer que :
- Votre commande utilise un éditeur travaillant dans le contexte d'une transaction de base de données (par exemple SQL, Firestore, Bolt).
- Le composant Forwarder est en cours d'exécution, utilisant un abonné de base de données et un éditeur de courtier de messages.
Dans ce cas, la commande peut ressembler à ceci :
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Pour activer le fonctionnement du composant Forwarder en arrière-plan et transférer des messages de MySQL à Google Pub/Sub, vous devez le configurer comme suit :
Pour le code complet, veuillez vous référer à github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Persister les données dans MySQL et publier des événements dans Google Cloud Pub/Sub dans une transaction.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Annulation de la transaction en raison d'une erreur", watermill.LogFields{"error": err.Error()})
// En cas d'erreur, en raison de l'annulation de la transaction MySQL, nous pouvons nous assurer que les scénarios indésirables suivants ne se produisent pas :
// - Événement publié mais données non persistées,
// - Données persistées mais événement non publié.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Décorer l'éditeur avec une enveloppe comprise par le composant forwarder.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Publier une annonce de l'événement de conclusion de loterie. Veuillez noter que nous publions ici vers un sujet Google Cloud en utilisant l'éditeur MySQL décoré.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Si vous souhaitez en savoir plus sur cet exemple, veuillez trouver son implémentation [ici](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).