Bedeutung der Veröffentlichung von Nachrichten in einer Transaktion
Bei der Arbeit mit ereignisgesteuerten Anwendungen kann es vorkommen, dass Sie den Anwendungszustand persistieren und Nachrichten veröffentlichen müssen, um andere Teile des Systems über das gerade Geschehene zu informieren. In einem idealen Szenario möchten Sie den Anwendungszustand persistieren und eine Nachricht innerhalb einer einzigen Transaktion veröffentlichen, da ein Versäumnis dies zu tun zu Dateninkonsistenzen führen kann. Um Datenverarbeitung und die Veröffentlichung von Ereignissen gleichzeitig in einer Transaktion zu bestätigen, müssen Sie in der Lage sein, Nachrichten in dieselbe Datenbank zu veröffentlichen, die für die Datenspeicherung verwendet wird, oder einen Zwei-Phasen-Commit (2PC)-Mechanismus implementieren. Wenn Sie die Message-Broker-Funktionalität nicht in die Datenbank integrieren und kein neues Rad erfinden möchten, können Sie Ihre Arbeit einfach halten, indem Sie das Forwarder-Komponente von Watermill verwenden!
Forwarder-Komponente
Sie können sich den Forwarder als einen Hintergrunddienst vorstellen, der darauf wartet, dass Nachrichten in die Datenbank veröffentlicht werden und sicherstellt, dass sie letztendlich den Message-Broker erreichen.
Um den Forwarder allgemein und einfach zu verwenden zu machen, lauscht er auf einem dedizierten Thema in der Zwischendatenbank und verwendet einen dekorierten Forwarder-Publisher, um die verpackten Nachrichten zu senden. Der Forwarder packt sie aus und sendet sie an das angegebene Zielthema auf dem Message-Broker.
Beispiel
Betrachten wir das folgende Beispiel: Es gibt einen Befehl, der für die Durchführung einer Lotterieverlosung verantwortlich ist. Er muss zufällig einen registrierten Benutzer im System als Gewinner auswählen. Während er dies tut, sollte er auch seine Entscheidung persistieren, indem er einen Datenbankeintrag speichert, der die eindeutige Lotterie-ID mit der ausgewählten Benutzer-ID verknüpft. Zusätzlich sollte, als ein ereignisgesteuertes System, ein LotteryConcluded
-Ereignis veröffentlicht werden, damit andere Komponenten angemessen reagieren können. Genauer gesagt - es wird eine Komponente geben, die die Preise an den Lotteriegewinner versendet. Sie wird das LotteryConcluded
-Ereignis erhalten und mithilfe der in der Einbettung enthaltenen Lotterie-ID mit dem Datenbankeintrag überprüfen, um den Gewinner zu bestimmen.
In unserem Szenario ist die Datenbank MySQL und der Message-Broker Google Pub/Sub, aber es könnten auch jede andere zwei Technologien sein.
Bei der Implementierung eines solchen Befehls können verschiedene Ansätze verfolgt werden. Im Folgenden werden wir drei mögliche Versuche vorstellen und auf ihre Mängel hinweisen.
Zuerst Event veröffentlichen, dann Daten speichern
Bei diesem Ansatz veröffentlicht der Befehl zuerst ein Event und speichert dann die Daten. Obwohl diese Methode in den meisten Fällen korrekt funktionieren kann, sollten potenzielle Probleme identifiziert werden.
Der Befehl muss drei grundlegende Operationen ausführen:
- Einen zufälligen Benutzer
A
als Gewinner auswählen. - Ein
LotteryConcluded
-Event veröffentlichen, um mitzuteilen, dass die LotterieB
beendet ist. - In der Datenbank speichern, dass die Lotterie
B
von BenutzerA
gewonnen wurde.
Jeder Schritt ist anfällig für Fehler, die unseren Befehlsfluss unterbrechen können. Wenn der erste Schritt fehlschlägt, sind die Konsequenzen nicht schwerwiegend - wir müssen einfach einen Fehler zurückgeben und den gesamten Befehl als gescheitert betrachten. Es werden keine Daten gespeichert, und kein Event wird veröffentlicht. Wir können einfach den Befehl erneut ausführen.
Wenn der zweite Schritt fehlschlägt, haben wir das Event noch nicht veröffentlicht oder die Daten in der Datenbank gespeichert. Wir können den Befehl erneut ausführen und es erneut versuchen.
Das interessanteste Szenario ist, was passiert, wenn der dritte Schritt fehlschlägt. Nach dem zweiten Schritt haben wir bereits das Event veröffentlicht, aber letztendlich werden keine Daten in der Datenbank gespeichert. Andere Komponenten erhalten das Signal, dass die Lotterie beendet ist, aber es wird kein Gewinner mit der in dem Event gesendeten Lotterie-ID verknüpft sein. Sie werden den Gewinner nicht überprüfen können, sodass auch ihre Operationen als gescheitert betrachtet werden müssen.
Wir können uns noch aus dieser Situation befreien, aber es können einige manuelle Vorgänge erforderlich sein, wie z.B. den Befehl mit der Lotterie-ID aus dem bereits gesendeten Event erneut auszuführen.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Veröffentlicht das Event zuerst in Google Cloud Pub/Sub und speichert dann die Daten in MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// Wenn ein Fehler auftritt, wurde das Event gesendet, aber die Daten wurden noch nicht gespeichert.
if err = simulateError(); err != nil {
logger.Error("Daten konnten nicht gespeichert werden", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Zuerst Daten speichern, dann das Ereignis veröffentlichen
Im zweiten Ansatz werden wir versuchen, die Mängel des erstmaligen Adressierungsansatzes zu beheben. Um zu verhindern, dass die Fehlermeldung an externe Komponenten durchsickert, wenn der Zustand nicht korrekt in der Datenbank gespeichert ist und Ereignisse nicht veröffentlicht werden, ändern wir die Reihenfolge der Aktionen wie folgt:
- Wähle den Benutzer
A
zufällig als Gewinner der LotterieB
aus. - Behalte die Information bei, dass die Lotterie
B
von BenutzerA
gewonnen wurde, in der Datenbank. - Veröffentliche ein
LotteryConcluded
Ereignis, das darüber informiert, dass die LotterieB
beendet ist.
Ähnlich wie beim ersten Ansatz gibt es keine Konsequenzen, wenn die ersten beiden Aktionen fehlschlagen. Im Falle des Versagens der dritten Aktion werden unsere Daten zwar in der Datenbank gespeichert, aber kein Ereignis wird veröffentlicht. In diesem Fall geben wir die Fehlermeldung an Komponenten außerhalb der Lotterie nicht weiter. Allerdings wird unser Gewinner aufgrund des erwarteten Systemverhaltens den Preis nicht erhalten können, da kein Ereignis an die verantwortliche Komponente für diesen Vorgang übermittelt wird.
Dieses Problem könnte auch durch manuelle Bedienung gelöst werden, d.h., durch manuelle Veröffentlichung des Ereignisses. Aber wir können es besser machen.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Zuerst die Daten in MySQL persistieren und dann das Ereignis direkt an Google Cloud Pub/Sub veröffentlichen.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Wenn dies fehlschlägt, sind unsere Daten bereits gespeichert, aber kein Ereignis wird veröffentlicht.
if err = simulateError(); err != nil {
logger.Error("Fehler bei der Veröffentlichung des Ereignisses", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Speichern von Daten und Veröffentlichen von Ereignissen in einer Transaktion
Stellen Sie sich vor, dass unser Befehl die zweite und dritte Aufgabe gleichzeitig ausführen kann. Sie werden atomar durchgeführt, was bedeutet, dass wenn eine Aufgabe fehlschlägt, die andere nicht erfolgreich sein kann. Dies kann durch Nutzung des Transaktionsmechanismus erreicht werden, der bereits in den meisten Datenbanken implementiert ist. Das in unserem Beispiel verwendete MySQL ist eine davon.
Um Daten zu speichern und Ereignisse in einer einzelnen Transaktion zu veröffentlichen, müssen wir in der Lage sein, Nachrichten an MySQL zu veröffentlichen. Da wir den Message Broker nicht im gesamten System auf MySQL umstellen möchten, müssen wir andere Möglichkeiten finden, um dies zu erreichen.
Die gute Nachricht ist: Watermill stellt alle notwendigen Tools zur Verfügung! Wenn Sie eine Datenbank wie MySQL, PostgreSQL (oder eine andere SQL-Datenbank), Firestore oder Bolt verwenden, können Sie Nachrichten an diese veröffentlichen. Die Forwarder-Komponente unterstützt Sie dabei, alle Nachrichten, die Sie in die Datenbank veröffentlichen, auszuwählen und sie an Ihren Message Broker weiterzuleiten.
Sie müssen sicherstellen, dass:
- Ihr Befehl einen Publisher verwendet, der im Kontext einer Datenbanktransaktion arbeitet (z. B. SQL, Firestore, Bolt).
- Die Forwarder-Komponente läuft und verwendet einen Datenbank-Abonnenten sowie einen Message-Broker-Publisher.
In diesem Fall könnte der Befehl wie folgt aussehen:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Um die Forwarder-Komponente im Hintergrund arbeiten zu lassen und Nachrichten von MySQL nach Google Pub/Sub weiterzuleiten, müssen Sie sie wie folgt einrichten:
Für den vollständigen Code, siehe github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Daten in MySQL speichern und Ereignisse in Google Cloud Pub/Sub in einer Transaktion veröffentlichen.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Die Transaktion wird aufgrund eines Fehlers zurückgesetzt", watermill.LogFields{"error": err.Error()})
// Im Falle eines Fehlers, durch das Rollback der MySQL-Transaktion, können wir sicherstellen, dass die folgenden unerwünschten Szenarien nicht eintreten:
// - Ereignis veröffentlicht, aber Daten nicht gespeichert,
// - Daten gespeichert, aber Ereignis nicht veröffentlicht.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Dekorieren des Publishers mit einem vom Forwarder-Komponenten verstandenen Umschlag.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Veröffentlichen einer Ankündigung des Abschlusses der Lotterie. Beachten Sie, dass wir hier mithilfe des dekorierten MySQL-Publishers zu einem Google Cloud-Topic veröffentlichen.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Wenn Sie mehr über dieses Beispiel erfahren möchten, finden Sie die Implementierung [hier](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).