Pentingnya Memublikasikan Pesan dalam Transaksi

Ketika berurusan dengan aplikasi berbasis peristiwa, ada saat-saat di mana Anda perlu mempertahankan status aplikasi dan memublikasikan pesan untuk memberitahu bagian lain dari sistem tentang apa yang baru saja terjadi. Dalam skenario ideal, Anda ingin mempertahankan status aplikasi dan memublikasikan pesan dalam satu transaksi, karena kegagalan melakukannya dapat menyebabkan masalah konsistensi data. Untuk secara bersamaan melakukan penyimpanan data dan mempublikasikan peristiwa dalam satu transaksi, Anda harus dapat mempublikasikan pesan ke database yang sama yang digunakan untuk penyimpanan data, atau menerapkan mekanisme komit dua fase (2PC). Jika Anda tidak ingin mengubah pialang pesan ke database dan tidak ingin menciptakan ulang sistem yang sudah ada, Anda dapat menyederhanakan pekerjaan Anda dengan menggunakan komponen Forwarder Watermill!

Komponen Forwarder

Anda dapat menganggap Forwarder sebagai sebuah demon latar belakang yang menunggu pesan-pesan untuk dipublikasikan ke database dan memastikan bahwa mereka akhirnya mencapai pialang pesan.

Untuk membuat Forwarder menjadi generik dan transparan untuk digunakan, ia mendengarkan pada topik terdedikasi pada database perantara dan menggunakan penerbit Forwarder yang dihiasi untuk mengirim pesan-pesan yang terenkapsulasi. Forwarder membuka pesan-pesan tersebut dan mengirimkannya ke topik target yang ditentukan pada pialang pesan.

Contoh

Mari kita pertimbangkan contoh berikut: ada sebuah perintah yang bertanggung jawab untuk melakukan undian lotre. Perintah tersebut harus secara acak memilih seorang pengguna terdaftar dalam sistem sebagai pemenang. Sambil melakukan hal ini, ia juga harus mempertahankan keputusannya dengan menyimpan entri database yang mengaitkan ID lotre unik dengan ID pengguna yang terpilih. Selain itu, sebagai sistem berbasis peristiwa, ia juga harus mempublikasikan peristiwa LotteryConcluded, sehingga komponen-komponen lain dapat bereaksi secara tepat. Agar lebih tepat - akan ada sebuah komponen yang bertanggung jawab untuk mengirim hadiah kepada pemenang lotere. Komponen tersebut akan menerima peristiwa LotteryConcluded dan memverifikasi dengan entri database menggunakan ID lotre tersemat untuk menentukan pemenang.

Dalam skenario kita, database yang digunakan adalah MySQL dan pialang pesan adalah Google Pub/Sub, namun bisa juga menggunakan dua teknologi lain.

Ketika mengimplementasikan perintah seperti ini, berbagai pendekatan dapat diambil. Selanjutnya, akan kita perkenalkan tiga upaya yang mungkin dan menyoroti kekurangan-kekurangan mereka.

Terbitkan Acara Terlebih Dahulu, Kemudian Simpan Data

Dalam pendekatan ini, perintah pertama-tama menerbitkan suatu acara, dan kemudian menyimpan data. Meskipun metode ini mungkin berfungsi dengan benar dalam kebanyakan kasus, mari mencoba untuk mengidentifikasi potensi masalah.

Perintah perlu menjalankan tiga operasi dasar:

  1. Pilih pengguna acak A sebagai pemenang.
  2. Terbitkan acara LotteryConcluded untuk memberitahukan bahwa lotere B telah berakhir.
  3. Simpan di database bahwa lotere B telah dimenangkan oleh pengguna A.

Setiap langkah rentan terhadap kegagalan, yang dapat mengganggu alur perintah kita. Jika langkah pertama gagal, konsekuensinya tidak akan parah - kita hanya perlu mengembalikan galat dan mempertimbangkan seluruh perintah sebagai gagal. Tidak ada data yang akan disimpan, dan tidak ada acara yang akan diterbitkan. Kita bisa sekadar menjalankan ulang perintahnya.

Jika langkah kedua gagal, kita masih belum menerbitkan acara atau menyimpan data di database. Kita bisa menjalankan ulang perintahnya dan mencoba lagi.

Skenario paling menarik adalah apa yang terjadi jika langkah ketiga gagal. Setelah langkah kedua, kita sudah menerbitkan acaranya, tetapi pada akhirnya tidak ada data yang akan disimpan di database. Komponen lain akan menerima sinyal bahwa lotere telah berakhir, tetapi tidak akan ada pemenang yang terkait dengan ID lotere yang dikirim dalam acara tersebut. Mereka tidak akan dapat memverifikasi pemenang, sehingga operasi mereka juga harus dianggap sebagai gagal.

Kita masih bisa keluar dari situasi ini, tetapi mungkin memerlukan beberapa operasi manual, seperti menjalankan ulang perintah menggunakan ID lotere dari acara yang sudah dikirim.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Terbitkan acara ke Google Cloud Pub/Sub terlebih dahulu, kemudian simpan data ke MySQL.
func terbitkanAcaraDanSimpanData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// Jika terjadi galat, acara telah dikirim tetapi data belum disimpan.
	if err = simulasiGalat(); err != nil {
		logger.Error("Gagal menyimpan data", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

Simpan data terlebih dahulu, kemudian publikasikan acara

Dalam pendekatan kedua, kita akan mencoba untuk mengatasi kekurangan dari metode store terlebih dahulu. Untuk mencegah kegagalan situasi bocor ke komponen eksternal ketika status tidak tersimpan dengan benar di database dan acara tidak dipublikasikan, kita akan mengubah urutan tindakan sebagai berikut:

  1. Secara acak pilih pengguna A sebagai pemenang lotre B.
  2. Simpan informasi bahwa lotre B telah dimenangkan oleh pengguna A di database.
  3. Publikasikan acara LotteryConcluded, memberitahukan bahwa lotre B telah berakhir.

Sama seperti pendekatan pertama, jika dua tindakan pertama gagal, kita tidak akan mengalami konsekuensi. Dalam kasus kegagalan tindakan ketiga, data kita akan tetap tersimpan di database, tetapi tidak ada acara yang dipublikasikan. Dalam hal ini, kita tidak akan mengungkapkan situasi kegagalan ke komponen di luar lotre. Namun, mengingat perilaku sistem yang diharapkan, pemenang kita tidak akan dapat menerima hadiah karena tidak ada acara yang dilewatkan ke komponen yang bertanggung jawab atas operasi ini.

Masalah ini juga bisa diatasi dengan operasi manual, yaitu, secara manual mempublikasikan acara. Tetapi kita bisa melakukan yang lebih baik.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. Simpan data ke MySQL terlebih dahulu, kemudian langsung publikasikan acaranya ke Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// Jika ini gagal, data kita sudah tersimpan, tetapi tidak ada acara yang dipublikasikan.
	if err = simulateError(); err != nil {
		logger.Error("Gagal untuk mempublikasikan acara", err, nil)
		return err
	}

err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
	return err
}
// ...

Menyimpan Data dan Memublikasikan Acara dalam Transaksi

Bayangkan bahwa perintah kita dapat melakukan tugas kedua dan ketiga secara bersamaan. Mereka akan di-commit secara atomik, artinya jika satu tugas gagal, yang lain tidak dapat berhasil. Hal ini dapat dicapai dengan memanfaatkan mekanisme transaksi yang sudah diimplementasikan di sebagian besar basis data. MySQL yang digunakan dalam contoh kami adalah salah satunya.

Untuk menyimpan data dan memublikasikan acara dalam satu transaksi, kita perlu mampu memublikasikan pesan ke MySQL. Karena kita tidak ingin mengubah pialang pesan untuk didukung oleh MySQL di seluruh sistem, kita harus menemukan cara lain untuk mencapainya.

Berita baiknya adalah: Watermill menyediakan semua alat yang diperlukan! Jika Anda menggunakan basis data seperti MySQL, PostgreSQL (atau basis data SQL lainnya), Firestore, atau Bolt, Anda dapat memublikasikan pesan ke dalamnya. Komponen Forwarder akan membantu Anda memilih semua pesan yang Anda publikasikan ke basis data dan meneruskannya ke pialang pesan Anda.

Anda perlu memastikan bahwa:

  1. Perintah Anda menggunakan penerbit yang bekerja dalam konteks transaksi basis data (misalnya SQL, Firestore, Bolt).
  2. Komponen Forwarder berjalan, menggunakan langganan basis data dan penerbit pialang pesan.

Dalam hal ini, perintah dapat terlihat seperti ini:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Untuk mengaktifkan komponen Forwarder agar dapat bekerja di latar belakang dan meneruskan pesan dari MySQL ke Google Pub/Sub, Anda perlu mengatur seperti berikut:

Untuk kode lengkap, silakan lihat github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// Menyimpan data ke MySQL dan memublikasikan acara ke Google Cloud Pub/Sub dalam satu transaksi.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("Membatalkan transaksi karena terjadi kesalahan", watermill.LogFields{"error": err.Error()})
			// Dalam kasus kesalahan, karena rollback transaksi MySQL, kita dapat memastikan skenario tidak diinginkan berikut tidak terjadi:
			// - Acara dipublikasikan tetapi data tidak persist,
			// - Data terpersist tapi acara tidak dipublikasikan.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Hiasi penerbit dengan amplop yang dipahami oleh komponen forwarder.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// Memublikasikan pengumuman acara penutupan undian. Harap diperhatikan bahwa kami mempublikasikan ke topik Google Cloud di sini menggunakan penerbit MySQL yang dihiasi.
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
Jika Anda ingin mempelajari lebih lanjut tentang contoh ini, silakan temukan implementasinya [di sini](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).