ความสำคัญของการเผยแพร่ข้อความในรายการทำธุรกิจ

เมื่อมีการดำเนินการแอปพลิเคชันที่ขับเคลื่อนตามเหตุการณ์ อาจมีช่วงเวลาที่คุณต้องการทราบถึงสถานะแอพลิเคชันที่ถูกบันทึกไว้และเผยแพร่ข้อความเพื่อแจ้งให้ส่วนอื่นของระบบทราบเกี่ยวกับสิ่งที่เพิงเกิดขึ้น ในสถานการณ์ที่เหมาะสม คุณต้องการบันทึกสถานะแอพลิเคชันและเผยแพร่ข้อความ ภายในการทำธุรกิจเดียวกัน โดยที่การทำไม่ถูกต้องอาจนำไปสู่ปัญหาความสอดคล้องข้อมูล ในการยืนยันการเก็บข้อมูลและเผยแพร่เหตุการณ์ไปพร้อมกันในการทำธุรกิจเดียวกัน คุณต้องสามารถเผยแพร่ข้อความไปยังฐานข้อมูลเดียวกันที่ใช้สำหรับการเก็บข้อมูล หรือใช้กลไก two-phase commit (2PC) หากคุณไม่ต้องการเปลี่ยนทางโรงงานข้อความไปยังฐานข้อมูลและไม่ต้องการสร้างการทำซ้ำอีกครั้ง คุณสามารถทำงานของคุณให้ง่ายขึ้นได้โดยการใช้ Forwarder component ของ Watermill!

คอมโพเนนต์ Forwarder

คุณสามารถคิดว่า Forwarder เป็นเดมอนพื้นหลังที่รอข้อความที่จะถูกเผยแพร่ไปยังฐานข้อมูล และรับรองว่าท้ายที่สุดมันจะถึงที่ข้อความของโบรกเกอร์

เพื่อทำให้ Forwarder สามารถใช้งานได้เหมาะสมและโปร่งใส มันฟังอยู่ในหัวข้อที่จัดสรรบนฐานข้อมูลกลางและใช้ Forwarder publisher ที่ถูกตกแต่ง เพื่อส่งข้อความที่ห่อหุ้มไป Forwarder จะแกะเอาออกและส่งไปยังหัวข้อที่เป้าหมายบนโบรกเกอร์ของข้อความ

ตัวอย่าง

พิจารณาตัวอย่างต่อไปนี้: มีคำสั่งที่รับผิดชอบการดำเนินงานสุ่มสลาก มันต้องเลือกผู้ใช้ที่ลงทะเบียนในระบบไว้เป็นผู้โชคดี ในระหว่างการทำเช่นนั้น มันยังควรบันทึกการตัดสินใจโดยการเก็บข้อมูลเข้าฐานข้อมูลที่เชื่อมโยงกับรหัสสลากที่ไม่ซ้ำกัน พร้อมกับการเป็นระบบที่ขับเคลื่อนตามเหตุการณ์ มันยังควรเผยแพร่เหตุการณ์ LotteryConcluded เพื่อให้ส่วนประกอบอื่น ๆ สามารถตอบโต้ได้อย่างเหมาะสม ให้แน่ใจ - จะมีส่วนประกอบที่รับผิดชอบการส่งรางวัลไปหาผู้โชคดีของสลาก มันจะได้รับเหตุการณ์ LotteryConcluded และทำการตรวจสอบกับการเข้าถึงฐานข้อมูลโดยใช้รหัสสลากที่ซ่อนอยู่เพื่อกำหนดผู้โชคดี

ในสถานการณ์ของเรา ฐานข้อมูลคือ MySQL และโปรแกรมการเผยแพร่ข้อความคือ Google Pub/Sub แต่มันสามารถเป็นหรือทั้งสองเทคโนโลยี

เมื่อปฏิบัติคำสั่งในตัวอย่างนี้ มีเทคนิคที่เหลือเหล่านี้ที่สามารถนำไปใช้ได้ ในที่ต่อมา เราจะแนะนำว่ามีสามวิธีที่เป็นไปได้และชี้แจงข้อบกพร่องของมันไว้ด้วย

ตีพิมพ์เหตุการณ์ก่อน จากนั้นจึงเก็บข้อมูล

ในวิธีการนี้ คำสั่งจะทำการตีพิมพ์เหตุการณ์ก่อนและจึงเก็บข้อมูล แม้ว่าวิธีนี้จะทำงานได้อย่างถูกต้องในเหตุการณ์ส่วนใหญ่ แต่เราจะพยายามที่จะระบุปัญหาที่เป็นไปได้

คำสั่งจำเป็นต้องดำเนินการที่มีหลักการเบื้องต้นสามอย่าง:

  1. เลือกผู้ใช้สุ่ม A เป็นผู้โชคดี
  2. ตีพิมพ์เหตุการณ์ LotteryConcluded เพื่อแจ้งให้ทราบว่าสลาก B ได้สิ้นสุดลง
  3. เก็บข้อมูลในฐานข้อมูลว่าสลาก B ได้ถูกรางวัลโดยผู้ใช้ A

ขั้นตอนทุกขั้นตอนมีโอกาสล้มเหลวที่อาจเสี่ยงถึงการหยุดการทำงานของคำสั่งเรา ถ้าขั้นตอนแรกล้มเหลว ผลกระทบจะไม่รุนแรง - เราเพียงต้องคืนค่าผิดพลาดและพิจารณาคำสั่งทั้งหมดว่าพ้นสิ้นลงสำเร็จ ข้อมูลจะไม่ถูกเก็บไว้ และไม่มีเหตุการณ์ไหนถูกตีพิมพ์ เราสามารถที่จะเรียกใช้คำสั่งใหม่อีกครั้ง

หากขั้นตอนที่สองล้มเหลว เรายังคงไม่ได้ตีพิมพ์เหตุการณ์หรือเก็บข้อมูลในฐานข้อมูล เราสามารถเรียกใช้คำสั่งใหม่อีกครั้ง

สถานการณ์ที่น่าสนใจที่สุดคือสิ่งที่เกิดขึ้นถ้าขั้นตอนที่สามล้มเหลว หลังขั้นตอนที่สอง เราได้ตีพิมพ์เหตุการณ์ไปแล้ว แต่ในที่สุดข้อมูลจะไม่ถูกเก็บไว้ในฐานข้อมูล องค์กรอื่น ๆ จะได้รับสัญญาณว่าสลากได้สิ้นสุดลง แต่จะไม่มีผู้โชคดีที่สัมพันธ์กับ ID สลากที่ได้ถูกส่งไปในเหตุการณ์ เขาจะไม่สามารถพิสูจน์ผู้โชคดีได้ เพราะฉะนั้นการดำเนินการของเขาก็ต้องถือเป็นล้มเหลว

เรายังคงสามารถออกจากสถานการณ์นี้ได้ แต่อาจจะต้องใช้การดำเนินการบางอย่างที่เป็นกลางมือ เช่น การรันคำสั่งใหม่โดยใช้ ID สลากจากเหตุการณ์ที่ถูกส่งไปแล้ว

รหัสภายในเต็มรูปแบบ: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. ตีพิมพ์เหตุการณ์ไปยัง Google Cloud Pub/Sub ก่อน จากนั้นจึงเก็บข้อมูลใน MySQL
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// หากมีความล้มเหลว หลังจากนั้นเหตุการณ์ได้ถูกส่งแต่ข้อมูลยังไม่ถูกบันทึกไว้
	if err = simulateError(); err != nil {
		logger.Error("การบันทึกข้อมูลล้มเหลว", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

เก็บข้อมูลก่อน แล้วจึงเผยแพร่เหตุการณ์

ในวิธีการที่สอง เราจะพยายามแก้ไขข้อบกพร่องของวิธีการที่ทำการสอบถามก่อนก่อน โดยเพื่อป้องกันไม่ให้สถานการณ์ที่เกิดความล้มเหตุถูกรั่วไหลไปยังส่วนประกอบภายนอกเมื่อสถานะไม่ได้รับการบันทึกไว้ในฐานข้อมูลอย่างถูกต้องและเหตุการณ์ไม่ได้ถูกเผยแพร่ เราจะเปลี่ยนลำดับของการดำเนินการตามนี้:

  1. เลือกผู้ใช้ A ที่ได้รับรางวัลจากสลาก B โดยสุ่ม
  2. บันทึกข้อมูลว่า สลาก B ได้รับรางวัลจากผู้ใช้ A ลงในฐานข้อมูล
  3. เผยแพร่เหตุการณ์ LotteryConcluded เพื่อแจ้งให้ทราบว่า สลาก B ได้จบลง

เหมือนกับวิธีการแรก หากการดำเนินการสองขั้นตอนแรกล้มเหลว เราจะไม่ประสบผลสุดท้าย เมื่อการดำเนินการที่สามล้มเหลว ข้อมูลของเราจะถูกบันทึกไว้ในฐานข้อมูล แต่ไม่มีเหตุการณ์ที่ถูกเผยแพร่ ในกรณีนี้ เราจะไม่ได้เปิดเผยสถานการณ์ที่ล้มเหลวไปยังส่วนประกอบภายนอกของสลาก เว้นไปให้ทราบสำหรับระบบที่คาดหวัง ผู้โชคดีของเราจะไม่สามารถรับรางวัลได้ เนื่องจากไม่มีเหตุการณ์ที่ถูกส่งไปยังส่วนประกอบที่รับผิดชอบสำหรับการดำเนินการนี้

ปัญหานี้สามารถแก้ไขได้โดยการดำเนินการด้วยตัวเอง นั่นคือ เผยแพร่เหตุการณ์โดยการดำเนินการด้วยตัวเอง แต่เราสามารถทำให้ดีขึ้นได้

รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. บันทึกข้อมูลลงใน MySQL ก่อน จากนั้นเผยแพร่เหตุการณ์โดยตรงไปยัน Google Cloud Pub/Sub
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// หากการดำเนินการนี้ล้มเหลว ข้อมูลของเราจะถูกบันทึกไว้แล้ว แต่ไม่มีเหตุการณ์ที่ถูกเผยแพร่
	if err = simulateError(); err != nil {
		logger.Error("การเผยแพร่เหตุการณ์ล้มเหลว", err, nil)
		return err
	}

err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
	return err
}
// ...

การเก็บข้อมูลและการเผยแพร่เหตุการณ์ในระหว่างทรานแซกชัน

สมมติว่าคำสั่งของเราสามารถดำเนินการงานที่สองและทราบงานที่สามพร้อมกัน เหล่านี้จะถูกยืนยันแบบอะตอมิคซึ่งหมายความว่าหากมีการทำงานล้มเหลวในหนึ่งงาน งานอีกอันจะไม่สามารถประสบความสำเร็จ นี้สามารถทำได้โดยการใช้กลไกทรานแซกชันที่เคยถูกนำมาใช้แล้วในฐานข้อมูลส่วนใหญ่ ๆ มี MySQL ที่ถูกนำมาใช้ในตัวอย่างของเรา

เพื่อที่จะเก็บข้อมูลและเผยแพร่ข้อความในทรานแซกชันเดียวกัน เราต้องสามารถเผยแพร่ข้อมูลไปยัง MySQL โดยเราไม่ต้องการเปลี่ยนแปลงตัวหรือของข้อมูลทั้งระบบให้รองรับโดย MySQL เราจึงต้องหาวิธีอื่น ๆ เพื่อทำให้สามารถทำได้นี้

ข่าวดีคือ: Watermill มีเครื่องมือทั้งหมดที่จำเป็น! หากคุณใช้ฐานข้อมูลเช่น MySQL, PostgreSQL (หรือฐานข้อมูล SQL อื่น ๆ), Firestore หรือ Bolt คุณสามารถเผยแพร่ข้อความไปยังพวกเขาได้ ส่วนประกอบ Forwarder จะช่วยคุณในการเลือกข้อความทั้งหมดที่คุณเผยแพร่ไปยังฐานข้อมูลและส่งต่อไปยังตัวสตรีมของข้อมูลของคุณ

คุณต้องให้การมั่นใจว่า:

  1. คำสั่งของคุณใช้ตัวเผยแพร่ที่ทำงานในบริบทของทรานแซกชันของฐานข้อมูล (เช่น SQL, Firestore, Bolt).
  2. ส่วนประกอบ Forwarder กำลังทำงานโดยใช้ตัวติดตามของฐานข้อมูลและตัวเผยแพร่ของข้อมูลตัวสตรีม

ในกรณีนี้ คำสั่งอาจมีลักษณะดังนี้:

โค้ดเต็ม: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

เพื่อให้ส่งตัวติดตาม Forwarder ให้ทำงานพร้อมและส่งตัวข้อความจาก MySQL ไปยัง Google Pub/Sub คุณต้องตั้งค่าดังนี้:

สำหรับโค้ดทั้งหมด กรุณารองไห้ที่ github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// ทำการบันทึกข้อมูลไปยัง MySQL และเผยแพร่เหตุการณ์ไปยัง Google Cloud Pub/Sub ในทรานแซกชัน
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("ย้อนกลับการทำงานเนื่องจากมีข้อผิดพลาด", watermill.LogFields{"error": err.Error()})
			// ในกรณีของข้อผิดพลาด เนื่องจากการย้อนกลับของทรานแซกชันของ MySQL เราสามารถทำให้แน่ใจได้ว่า ภาวะที่ไม่ต้องการต่อไปทำซ้ำไม่เกิดขึ้นได้:
			// - ข้อความถูกเผยแพร่แต่ข้อมูลไม่ถูกบันทึก,
			// - ข้อมูลถูกบันทึกแต่ข้อความไม่ถูกเผยแพร่.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// ตกแต่งตัวเผยแพร่ด้วยซองไปยังบูรณะที่เข้าใจโดยส่วนประกอบของตัวที่ติดตาม
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// เผยแพร่คำโฆษณาของเหตุการณ์สรุปสุ่ม โดยที่เราเผยแพร่ไปยังหัวข้อของ Google Cloud ที่นี่โดยที่ใช้ตัวเผยแพร่ของ MySQL ที่ถูกตกแต่ง
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
หากคุณต้องการเรียนรู้เพิ่มเติมเกี่ยวกับตัวอย่างนี้ โปรดค้นหาการนำไปใช้งานของมัน [ที่นี่](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) ครับ/ค่ะ