Importance of Publishing Messages in a Transaction
When dealing with event-driven applications, there may be times when you need to persist application state and publish messages to inform other parts of the system about what just happened. In an ideal scenario, you would want to persist the application state and publish a message within a single transaction, as failing to do so can lead to data consistency issues. To simultaneously commit data storage and publish events within one transaction, you must be able to publish messages to the same database used for data storage, or implement a two-phase commit (2PC) mechanism. If you don't want to change the message broker to the database and don't want to reinvent the wheel, you can simplify your work by using Watermill's Forwarder component!
Forwarder Component
You can think of the Forwarder as a background daemon that waits for messages to be published to the database and ensures that they eventually reach the message broker.
To make the Forwarder generic and transparent to use, it listens on a dedicated topic on the intermediate database and uses a decorated Forwarder publisher to send the encapsulated messages. The Forwarder unpacks them and sends them to the specified target topic on the message broker.
Example
Let's consider the following example: there is a command responsible for conducting a lottery draw. It must randomly select a registered user in the system as the winner. While doing this, it should also persist its decision by storing a database entry linking the unique lottery ID with the selected user's ID. Additionally, as an event-driven system, it should also publish a LotteryConcluded
event, so that other components can react appropriately. To be precise - there will be a component responsible for sending prizes to the lottery winner. It will receive the LotteryConcluded
event and verify with the database entry using the embedded lottery ID to determine the winner.
In our scenario, the database is MySQL and the message broker is Google Pub/Sub, but it could be any other two technologies.
When implementing such a command, various approaches can be taken. In the following, we will introduce three possible attempts and point out their shortcomings.
Publish Event First, Then Store Data
In this approach, the command first publishes an event and then stores the data. Although this method may work correctly in most cases, let's try to identify potential issues.
The command needs to execute three basic operations:
- Select a random user
A
as the winner. - Publish a
LotteryConcluded
event to inform that lotteryB
has ended. - Store in the database that the lottery
B
has been won by userA
.
Each step is prone to failure, which can disrupt our command flow. If the first step fails, the consequences will not be severe - we just need to return an error and consider the entire command as failed. No data will be stored, and no event will be published. We can simply rerun the command.
If the second step fails, we still haven't published the event or stored the data in the database. We can rerun the command and try again.
The most interesting scenario is what happens if the third step fails. After the second step, we have already published the event, but ultimately no data will be stored in the database. Other components will receive the signal that the lottery has ended, but there will be no winner associated with the lottery ID sent in the event. They won't be able to verify the winner, so their operations must also be considered as failed.
We can still get out of this situation, but it may require some manual operations, such as rerunning the command using the lottery ID from the event that was already sent.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Publish the event to Google Cloud Pub/Sub first, then store the data in MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// If there is an error, the event has been sent but the data has not been saved yet.
if err = simulateError(); err != nil {
logger.Error("Failed to persist data", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Store data first, then publish the event
In the second approach, we will try to address the shortcomings of the address first method. In order to prevent leaking the failure situation to external components when the state is not correctly persisted in the database and events are not published, we will change the order of actions as follows:
- Randomly select user
A
as the winner of the lotteryB
. - Persist information that lottery
B
has been won by userA
in the database. - Publish a
LotteryConcluded
event, informing that lotteryB
has ended.
Similar to the first approach, if the first two actions fail, we have no consequences. In the case of the failure of the third action, our data will be persisted in the database, but no event will be published. In this case, we will not leak the failure situation to components outside of the lottery. However, considering the expected system behavior, our winner will not be able to receive the prize because no event is passed to the responsible component for this operation.
This issue could also be resolved by manual operation, i.e., manually publishing the event. But we can do better.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Persist the data to MySQL first, then directly publish the event to Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// If this fails, our data is already persisted, but no event is published.
if err = simulateError(); err != nil {
logger.Error("Failed to publish the event", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Storing Data and Publishing Events in a Transaction
Imagine that our command can perform the second and third tasks at the same time. They will be committed atomically, meaning that if one task fails, the other cannot succeed. This can be achieved by leveraging the transaction mechanism already implemented in most databases. The MySQL used in our example is one of them.
In order to store data and publish events in a single transaction, we need to be able to publish messages to MySQL. Since we do not want to change the message broker to be supported by MySQL throughout the entire system, we must find other ways to achieve this.
The good news is: Watermill provides all the necessary tools! If you are using a database like MySQL, PostgreSQL (or any other SQL database), Firestore, or Bolt, you can publish messages to them. The Forwarder component will help you select all the messages you publish to the database and forward them to your message broker.
You need to ensure that:
- Your command uses a publisher working in the context of a database transaction (e.g. SQL, Firestore, Bolt).
- The Forwarder component is running, using a database subscriber and a message broker publisher.
In this case, the command may look like this:
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
To enable the Forwarder component to work in the background and forward messages from MySQL to Google Pub/Sub, you need to set it up as follows:
For the complete code, please refer to github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Persist data to MySQL and publish events to Google Cloud Pub/Sub in a transaction.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Rolling back the transaction due to an error", watermill.LogFields{"error": err.Error()})
// In case of error, due to MySQL transaction rollback, we can ensure the following unwanted scenarios do not occur:
// - Event published but data not persisted,
// - Data persisted but event not published.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Decorate the publisher with an envelope understood by the forwarder component.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Publish an announcement of the lottery conclusion event. Please note that we publish to a Google Cloud topic here using the decorated MySQL publisher.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
If you want to learn more about this example, please find its implementation [here](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).