Важность публикации сообщений в транзакции
При работе с приложениями, основанными на событиях, могут возникать ситуации, когда необходимо сохранить состояние приложения и опубликовать сообщения, чтобы уведомить другие части системы о произошедшем. В идеальном сценарии вы захотите сохранить состояние приложения и опубликовать сообщение в рамках одной транзакции, поскольку несоблюдение этого принципа может привести к проблемам с целостностью данных. Для одновременного подтверждения хранения данных и публикации событий в рамках одной транзакции необходимо иметь возможность публиковать сообщения в той же базе данных, которая используется для хранения данных, или реализовать механизм двухфазного подтверждения (2PC). Если вы не хотите изменять брокер сообщений в базу данных и не хотите изобретать велосипед, вы можете упростить свою работу, используя компонент Forwarder в Watermill!
Компонент Forwarder
Вы можете рассматривать Forwarder как фоновый демон, который ожидает публикации сообщений в базу данных и гарантирует, что они в конечном итоге попадут в брокер сообщений.
Чтобы сделать Forwarder универсальным и прозрачным в использовании, он слушает выделенную тему в промежуточной базе данных и использует декорированный публикатор Forwarder для отправки инкапсулированных сообщений. Forwarder их распаковывает и отправляет на указанную целевую тему в брокере сообщений.
Пример
Давайте рассмотрим следующий пример: есть команда, ответственная за проведение лотереи. Она должна случайным образом выбрать зарегистрированного пользователя в системе в качестве победителя. При этом она также должна сохранить свое решение, сохранив запись в базе данных, связывающую уникальный идентификатор лотереи с идентификатором выбранного пользователя. Кроме того, как система, основанная на событиях, она также должна опубликовать событие LotteryConcluded
, чтобы другие компоненты могли реагировать соответственно. Для точности - будет компонент, ответственный за отправку призов победителю лотереи. Он получит событие LotteryConcluded
и проверит запись в базе данных, используя встроенный идентификатор лотереи, чтобы определить победителя.
В нашем сценарии используется база данных MySQL и брокер сообщений Google Pub/Sub, но могут использоваться и другие две технологии.
При реализации такой команды можно выбрать различные подходы. В дальнейшем мы рассмотрим три возможных попытки и выявим их недостатки.
Сначала опубликуйте событие, затем сохраните данные
В этом подходе команда сначала публикует событие, а затем сохраняет данные. Хотя этот метод может работать правильно в большинстве случаев, давайте попробуем выявить потенциальные проблемы.
Команда должна выполнить три основных операции:
- Выбрать случайного пользователя
A
как победителя. - Опубликовать событие
LotteryConcluded
, чтобы сообщить, что лотереяB
завершилась. - Сохранить в базе данных, что лотерея
B
была выиграна пользователемA
.
Каждый шаг подвержен отказам, которые могут нарушить нашу команду. Если первый шаг завершится неудачно, последствия не будут серьезными - нам просто нужно вернуть ошибку и рассматривать всю команду как неудавшуюся. Данные не будут сохранены, и событие не будет опубликовано. Мы просто можем повторно запустить команду.
Если второй шаг завершится неудачно, мы все еще не опубликовали событие или сохраняли данные в базе данных. Мы можем повторно запустить команду и попробовать снова.
Самый интересный сценарий - что произойдет, если третий шаг завершится неудачно. После второго шага мы уже опубликовали событие, но в конечном итоге данные не будут сохранены в базе данных. Другие компоненты получат сигнал о том, что лотерея завершилась, но победитель, связанный с идентификатором лотереи, отправленным в событии, отсутствовать. Они не смогут проверить победителя, поэтому их операции также должны быть признаны неудачными.
Мы все равно можем выйти из этой ситуации, но это может потребовать некоторых ручных операций, например, повторного запуска команды с использованием идентификатора лотереи из уже отправленного события.
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Сначала опубликуйте событие в Google Cloud Pub/Sub, затем сохраните данные в MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// Если произошла ошибка, событие было отправлено, но данные еще не сохранены.
if err = simulateError(); err != nil {
logger.Error("Не удалось сохранить данные", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Сначала сохраните данные, затем опубликуйте событие
Во втором подходе мы попытаемся устранить недостатки метода сначала обработки. Чтобы предотвратить утечку ситуации с ошибкой во внешние компоненты, когда состояние не правильно сохранено в базе данных и события не опубликованы, мы изменяем порядок действий следующим образом:
- Случайным образом выбрать пользователя
A
в качестве победителя лотереиB
. - Сохранить информацию о том, что лотерея
B
выиграна пользователемA
в базе данных. - Опубликовать событие
LotteryConcluded
, информируя о завершении лотереиB
.
Как и в первом подходе, если первые два действия завершаются неудачно, у нас не будет последствий. В случае сбоя третьего действия наши данные будут сохранены в базе данных, но событие не будет опубликовано. В этом случае мы не узнаем о сбое внешним компонентам лотереи. Однако, с учётом ожидаемого поведения системы, наш победитель не сможет получить приз, потому что ответственному компоненту для этой операции событие не будет передано.
Эту проблему также можно решить вручную, то есть вручную опубликовать событие. Но мы можем поступить лучше.
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Сначала сохраняем данные в MySQL, затем сразу же публикуем событие в Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Если это оказывается неудачным, наши данные уже сохранены, но событие не опубликовано.
if err = simulateError(); err != nil {
logger.Error("Сбой при публикации события", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Хранение данных и публикация событий в транзакции
Представьте, что наша команда может выполнить вторую и третью задачи одновременно. Они будут атомарно фиксироваться, что означает, что если одна задача не выполнится, другая не может быть выполнена. Это можно достичь, используя механизм транзакций, уже реализованный в большинстве баз данных. Используемый в нашем примере MySQL - одна из них.
Чтобы хранить данные и публиковать события в одной транзакции, нам нужно иметь возможность публиковать сообщения в MySQL. Поскольку мы не хотим изменять брокер сообщений на поддерживаемый MySQL во всей системе, мы должны найти другие способы достижения этой цели.
Хорошая новость заключается в том, что Watermill предоставляет все необходимые инструменты! Если вы используете базу данных, такую как MySQL, PostgreSQL (или любую другую SQL-базу данных), Firestore или Bolt, вы можете публиковать сообщения в них. Компонент Forwarder поможет вам выбрать все сообщения, которые вы публикуете в базу данных, и пересылать их в ваш брокер сообщений.
Вам необходимо убедиться в следующем:
- Ваша команда использует издателя в контексте транзакции базы данных (например, SQL, Firestore, Bolt).
- Компонент Forwarder работает, используя подписчика базы данных и издателя брокера сообщений.
В данном случае, команда может выглядеть следующим образом:
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Чтобы включить работу компонента Forwarder в фоновом режиме и пересылать сообщения из MySQL в Google Pub/Sub, необходимо настроить его следующим образом:
Для полного кода обратитесь к github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Сохранить данные в MySQL и опубликовать события в Google Cloud Pub/Sub в транзакции.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Откат транзакции из-за ошибки", watermill.LogFields{"error": err.Error()})
// В случае ошибки, из-за отката транзакции MySQL, мы можем гарантировать, что не произойдут следующие нежелательные сценарии:
// - Событие опубликовано, но данные не сохранены,
// - Данные сохранены, но событие не опубликовано.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Декорировать издателя оболочкой, понятной компоненту forwarder.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Опубликовать объявление о завершении лотереи. Обратите внимание, что мы публикуем здесь в Google Cloud Pub/Sub, используя декорированный издатель MySQL.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Если вы хотите узнать больше об этом примере, пожалуйста, найдите его реализацию [здесь](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).