トランザクション内でメッセージを発行する重要性

イベント駆動型のアプリケーションを扱う際には、アプリケーションの状態を永続化し、他のシステム部分に何が起こったかを通知するためにメッセージを発行する必要があることがあります。理想的なシナリオでは、アプリケーションの状態を永続化し、メッセージを発行することを単一のトランザクション内で行いたいと思うでしょう。これを怠ると、データの整合性の問題が発生する可能性があります。データの格納とイベントの同時コミットを行うためには、データの格納に使用されるデータベースにメッセージを発行できるようにしたり、二段階コミット(2PC)メカニズムを実装する必要があります。メッセージブローカーをデータベースに変更したくないし、車輪の再発明をしたくない場合は、WatermillのForwarderコンポーネントを使用することで作業を簡素化できます!

Forwarderコンポーネント

Forwarderは、メッセージがデータベースに公開されるのを待ち、最終的にメッセージブローカーに到達するようにするバックグラウンドデーモンと考えることができます。

Forwarderを汎用性があり透過的に使用できるようにするために、中間データベースの専用トピックでリッスンし、装飾されたForwarder publisherを使用してカプセル化されたメッセージを送信します。Forwarderはそれらを展開し、指定されたターゲットトピックにメッセージを送信します。

次の例を考えてみましょう。システム内の登録されたユーザーからランダムに抽選を行い、その抽選で選ばれたユーザーのIDと一意の抽選IDをリンクしたデータベースエントリを永続化することが責任のあるコマンドがあります。また、イベント駆動型システムとして、LotteryConcludedイベントを公開して、他のコンポーネントが適切に反応できるようにする必要があります。具体的には、抽選勝者に賞品を送信する責任があるコンポーネントがあります。LotteryConcludedイベントを受け取り、埋め込まれた抽選IDを使用してデータベースエントリを確認し、勝者を決定します。

このシナリオでは、データベースはMySQLであり、メッセージブローカーはGoogle Pub/Subですが、他の2つのテクノロジでも適用できます。

このようなコマンドを実装する際、さまざまなアプローチが考えられます。以下では、3つの可能な試みを紹介し、それぞれの欠点を指摘します。

イベントをまず公開し、その後データを保存する

この手法では、コマンドはまずイベントを公開し、その後データを保存します。この方法はほとんどの場合正常に動作するかもしれませんが、潜在的な問題を特定しましょう。

コマンドは3つの基本的な操作を実行する必要があります:

  1. ランダムなユーザー A を受賞者として選択します。
  2. LotteryConcluded イベントを公開して、抽選 B が終了したことを通知します。
  3. データベースに、抽選 B がユーザー A によって獲得されたことを保存します。

それぞれのステップは失敗しやすく、これによりコマンドの流れが崩れる可能性があります。最初のステップが失敗した場合、その結果は深刻ではありません - 単純にエラーを返し、コマンド全体を失敗とみなします。データは保存されず、イベントも公開されません。単純にコマンドを再実行することができます。

2番目のステップが失敗した場合、まだイベントを公開したりデータをデータベースに保存していません。コマンドを再実行してもう一度試すことができます。

最も興味深いシナリオは、3番目のステップが失敗した場合です。2番目のステップの後で、既にイベントを公開していますが、最終的にデータはデータベースに保存されません。他のコンポーネントは抽選が終了したことを受け取りますが、イベントで送られた抽選IDに関連付けられた受賞者はいないため、受賞者を確認することができません。そのため、彼らの操作も失敗とみなさなければなりません。

この状況からは抜け出すことができますが、イベントで既に送信された抽選IDを使用してコマンドを再実行するなど、手動操作が必要な場合があります。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Google Cloud Pub/Sub にイベントを最初に公開し、その後 MySQL にデータを保存する。
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// エラーがあれば、イベントは送信されましたがデータはまだ保存されていません。
	if err = simulateError(); err != nil {
		logger.Error("データの保存に失敗しました", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

最初にデータを保存し、その後イベントを公開する

2番目のアプローチでは、最初の方法の不備に対処しようとします。データベースに状態が正しく永続化されていない場合やイベントが公開されていない場合に、失敗状況が外部コンポーネントに漏れないようにするため、アクションの順序を以下のように変更します。

  1. ユーザーAを抽選Bの勝者としてランダムに選択します。
  2. データベースに抽選BがユーザーAによって獲得された情報を永続化します。
  3. LotteryConcludedイベントを公開し、抽選Bが終了したことを通知します。

最初の2つのアクションが失敗した場合も、何も影響がありません。3番目のアクションが失敗した場合は、データがデータベースに永続化されますが、イベントは公開されません。この場合、抽選外のコンポーネントに失敗状況が漏れることはありません。ただし、期待されるシステムの動作を考慮すると、当選者はイベントが当該操作を担当しているコンポーネントに渡されないため、賞品を受け取ることができません。

この問題は手動操作によっても解決できますが、もっと良い方法があります。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. 最初にMySQLにデータを永続化し、その後Google Cloud Pub/Subにイベントを直接公開します。
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// ここで失敗した場合は、データは既に永続化されていますが、イベントは公開されていません。
	if err = simulateError(); err != nil {
		logger.Error("イベントの公開に失敗しました", err, nil)
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

トランザクションでデータを保存し、イベントを公開する

まず、コマンドが2番目と3番目のタスクを同時に実行できると想像してください。これらはアトミックにコミットされ、つまり1つのタスクが失敗すると他のタスクは成功しないことになります。これは、ほとんどのデータベースにすでに実装されているトランザクションメカニズムを活用することで実現できます。この例で使用されているMySQLもその1つです。

データを保存し、イベントを単一トランザクションで公開するためには、MySQLにメッセージを公開できる必要があります。システム全体でMySQLをサポートするためにメッセージブローカーを変更したくはないので、別の方法を見つける必要があります。

うれしいことに、Watermillはすべての必要なツールを提供しています! MySQL、PostgreSQL(または他のSQLデータベース)、Firestore、またはBoltなどのデータベースを使用している場合、Forwarder コンポーネントを使用してメッセージを公開できます。

以下の点を確認する必要があります:

  1. コマンドは、データベーストランザクションのコンテキストで動作するパブリッシャーを使用していること(例: SQLFirestoreBolt)。
  2. Forwarder コンポーネントが、データベースサブスクライバーとメッセージブローカーパブリッシャーを使用して実行されていること。

この場合、コマンドは次のようになります:

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Forwarder コンポーネントをバックグラウンドで実行し、MySQLからGoogle Pub/Subにメッセージを転送するには、次のように設定する必要があります:

完全なコードは、github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.goを参照してください。

// ...
// データをMySQLに永続化し、イベントをGoogle Cloud Pub/Subにトランザクションで公開する。
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("エラーが発生したためトランザクションをロールバックします", watermill.LogFields{"error": err.Error()})
			// エラーが発生した場合は、MySQL トランザクションのロールバックにより、以下のような望ましくないシナリオが発生しないようにします:
			// - イベントが公開されたがデータが永続化されていない
			// - データが永続化されたがイベントが公開されていない
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Forwarderコンポーネントが理解する封筒でパブリッシャーを装飾します。
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// 抽選結果のイベントを公開します。ここではデコレートされたMySQLパブリッシャーを使用してGoogle Cloudトピックに公開していることに注意してください。
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
この例について詳しく学びたい場合は、その実装を[こちら](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go)で見つけることができます。