ความสำคัญของการเผยแพร่ข้อความในรายการทำธุรกิจ
เมื่อมีการดำเนินการแอปพลิเคชันที่ขับเคลื่อนตามเหตุการณ์ อาจมีช่วงเวลาที่คุณต้องการทราบถึงสถานะแอพลิเคชันที่ถูกบันทึกไว้และเผยแพร่ข้อความเพื่อแจ้งให้ส่วนอื่นของระบบทราบเกี่ยวกับสิ่งที่เพิงเกิดขึ้น ในสถานการณ์ที่เหมาะสม คุณต้องการบันทึกสถานะแอพลิเคชันและเผยแพร่ข้อความ ภายในการทำธุรกิจเดียวกัน โดยที่การทำไม่ถูกต้องอาจนำไปสู่ปัญหาความสอดคล้องข้อมูล ในการยืนยันการเก็บข้อมูลและเผยแพร่เหตุการณ์ไปพร้อมกันในการทำธุรกิจเดียวกัน คุณต้องสามารถเผยแพร่ข้อความไปยังฐานข้อมูลเดียวกันที่ใช้สำหรับการเก็บข้อมูล หรือใช้กลไก two-phase commit (2PC) หากคุณไม่ต้องการเปลี่ยนทางโรงงานข้อความไปยังฐานข้อมูลและไม่ต้องการสร้างการทำซ้ำอีกครั้ง คุณสามารถทำงานของคุณให้ง่ายขึ้นได้โดยการใช้ Forwarder component ของ Watermill!
คอมโพเนนต์ Forwarder
คุณสามารถคิดว่า Forwarder เป็นเดมอนพื้นหลังที่รอข้อความที่จะถูกเผยแพร่ไปยังฐานข้อมูล และรับรองว่าท้ายที่สุดมันจะถึงที่ข้อความของโบรกเกอร์
เพื่อทำให้ Forwarder สามารถใช้งานได้เหมาะสมและโปร่งใส มันฟังอยู่ในหัวข้อที่จัดสรรบนฐานข้อมูลกลางและใช้ Forwarder publisher ที่ถูกตกแต่ง เพื่อส่งข้อความที่ห่อหุ้มไป Forwarder จะแกะเอาออกและส่งไปยังหัวข้อที่เป้าหมายบนโบรกเกอร์ของข้อความ
ตัวอย่าง
พิจารณาตัวอย่างต่อไปนี้: มีคำสั่งที่รับผิดชอบการดำเนินงานสุ่มสลาก มันต้องเลือกผู้ใช้ที่ลงทะเบียนในระบบไว้เป็นผู้โชคดี ในระหว่างการทำเช่นนั้น มันยังควรบันทึกการตัดสินใจโดยการเก็บข้อมูลเข้าฐานข้อมูลที่เชื่อมโยงกับรหัสสลากที่ไม่ซ้ำกัน พร้อมกับการเป็นระบบที่ขับเคลื่อนตามเหตุการณ์ มันยังควรเผยแพร่เหตุการณ์ LotteryConcluded
เพื่อให้ส่วนประกอบอื่น ๆ สามารถตอบโต้ได้อย่างเหมาะสม ให้แน่ใจ - จะมีส่วนประกอบที่รับผิดชอบการส่งรางวัลไปหาผู้โชคดีของสลาก มันจะได้รับเหตุการณ์ LotteryConcluded
และทำการตรวจสอบกับการเข้าถึงฐานข้อมูลโดยใช้รหัสสลากที่ซ่อนอยู่เพื่อกำหนดผู้โชคดี
ในสถานการณ์ของเรา ฐานข้อมูลคือ MySQL และโปรแกรมการเผยแพร่ข้อความคือ Google Pub/Sub แต่มันสามารถเป็นหรือทั้งสองเทคโนโลยี
เมื่อปฏิบัติคำสั่งในตัวอย่างนี้ มีเทคนิคที่เหลือเหล่านี้ที่สามารถนำไปใช้ได้ ในที่ต่อมา เราจะแนะนำว่ามีสามวิธีที่เป็นไปได้และชี้แจงข้อบกพร่องของมันไว้ด้วย
ตีพิมพ์เหตุการณ์ก่อน จากนั้นจึงเก็บข้อมูล
ในวิธีการนี้ คำสั่งจะทำการตีพิมพ์เหตุการณ์ก่อนและจึงเก็บข้อมูล แม้ว่าวิธีนี้จะทำงานได้อย่างถูกต้องในเหตุการณ์ส่วนใหญ่ แต่เราจะพยายามที่จะระบุปัญหาที่เป็นไปได้
คำสั่งจำเป็นต้องดำเนินการที่มีหลักการเบื้องต้นสามอย่าง:
- เลือกผู้ใช้สุ่ม
A
เป็นผู้โชคดี - ตีพิมพ์เหตุการณ์
LotteryConcluded
เพื่อแจ้งให้ทราบว่าสลากB
ได้สิ้นสุดลง - เก็บข้อมูลในฐานข้อมูลว่าสลาก
B
ได้ถูกรางวัลโดยผู้ใช้A
ขั้นตอนทุกขั้นตอนมีโอกาสล้มเหลวที่อาจเสี่ยงถึงการหยุดการทำงานของคำสั่งเรา ถ้าขั้นตอนแรกล้มเหลว ผลกระทบจะไม่รุนแรง - เราเพียงต้องคืนค่าผิดพลาดและพิจารณาคำสั่งทั้งหมดว่าพ้นสิ้นลงสำเร็จ ข้อมูลจะไม่ถูกเก็บไว้ และไม่มีเหตุการณ์ไหนถูกตีพิมพ์ เราสามารถที่จะเรียกใช้คำสั่งใหม่อีกครั้ง
หากขั้นตอนที่สองล้มเหลว เรายังคงไม่ได้ตีพิมพ์เหตุการณ์หรือเก็บข้อมูลในฐานข้อมูล เราสามารถเรียกใช้คำสั่งใหม่อีกครั้ง
สถานการณ์ที่น่าสนใจที่สุดคือสิ่งที่เกิดขึ้นถ้าขั้นตอนที่สามล้มเหลว หลังขั้นตอนที่สอง เราได้ตีพิมพ์เหตุการณ์ไปแล้ว แต่ในที่สุดข้อมูลจะไม่ถูกเก็บไว้ในฐานข้อมูล องค์กรอื่น ๆ จะได้รับสัญญาณว่าสลากได้สิ้นสุดลง แต่จะไม่มีผู้โชคดีที่สัมพันธ์กับ ID สลากที่ได้ถูกส่งไปในเหตุการณ์ เขาจะไม่สามารถพิสูจน์ผู้โชคดีได้ เพราะฉะนั้นการดำเนินการของเขาก็ต้องถือเป็นล้มเหลว
เรายังคงสามารถออกจากสถานการณ์นี้ได้ แต่อาจจะต้องใช้การดำเนินการบางอย่างที่เป็นกลางมือ เช่น การรันคำสั่งใหม่โดยใช้ ID สลากจากเหตุการณ์ที่ถูกส่งไปแล้ว
รหัสภายในเต็มรูปแบบ: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. ตีพิมพ์เหตุการณ์ไปยัง Google Cloud Pub/Sub ก่อน จากนั้นจึงเก็บข้อมูลใน MySQL
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// หากมีความล้มเหลว หลังจากนั้นเหตุการณ์ได้ถูกส่งแต่ข้อมูลยังไม่ถูกบันทึกไว้
if err = simulateError(); err != nil {
logger.Error("การบันทึกข้อมูลล้มเหลว", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
เก็บข้อมูลก่อน แล้วจึงเผยแพร่เหตุการณ์
ในวิธีการที่สอง เราจะพยายามแก้ไขข้อบกพร่องของวิธีการที่ทำการสอบถามก่อนก่อน โดยเพื่อป้องกันไม่ให้สถานการณ์ที่เกิดความล้มเหตุถูกรั่วไหลไปยังส่วนประกอบภายนอกเมื่อสถานะไม่ได้รับการบันทึกไว้ในฐานข้อมูลอย่างถูกต้องและเหตุการณ์ไม่ได้ถูกเผยแพร่ เราจะเปลี่ยนลำดับของการดำเนินการตามนี้:
- เลือกผู้ใช้
A
ที่ได้รับรางวัลจากสลากB
โดยสุ่ม - บันทึกข้อมูลว่า สลาก
B
ได้รับรางวัลจากผู้ใช้A
ลงในฐานข้อมูล - เผยแพร่เหตุการณ์
LotteryConcluded
เพื่อแจ้งให้ทราบว่า สลากB
ได้จบลง
เหมือนกับวิธีการแรก หากการดำเนินการสองขั้นตอนแรกล้มเหลว เราจะไม่ประสบผลสุดท้าย เมื่อการดำเนินการที่สามล้มเหลว ข้อมูลของเราจะถูกบันทึกไว้ในฐานข้อมูล แต่ไม่มีเหตุการณ์ที่ถูกเผยแพร่ ในกรณีนี้ เราจะไม่ได้เปิดเผยสถานการณ์ที่ล้มเหลวไปยังส่วนประกอบภายนอกของสลาก เว้นไปให้ทราบสำหรับระบบที่คาดหวัง ผู้โชคดีของเราจะไม่สามารถรับรางวัลได้ เนื่องจากไม่มีเหตุการณ์ที่ถูกส่งไปยังส่วนประกอบที่รับผิดชอบสำหรับการดำเนินการนี้
ปัญหานี้สามารถแก้ไขได้โดยการดำเนินการด้วยตัวเอง นั่นคือ เผยแพร่เหตุการณ์โดยการดำเนินการด้วยตัวเอง แต่เราสามารถทำให้ดีขึ้นได้
// ...
// 2. บันทึกข้อมูลลงใน MySQL ก่อน จากนั้นเผยแพร่เหตุการณ์โดยตรงไปยัน Google Cloud Pub/Sub
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// หากการดำเนินการนี้ล้มเหลว ข้อมูลของเราจะถูกบันทึกไว้แล้ว แต่ไม่มีเหตุการณ์ที่ถูกเผยแพร่
if err = simulateError(); err != nil {
logger.Error("การเผยแพร่เหตุการณ์ล้มเหลว", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
การเก็บข้อมูลและการเผยแพร่เหตุการณ์ในระหว่างทรานแซกชัน
สมมติว่าคำสั่งของเราสามารถดำเนินการงานที่สองและทราบงานที่สามพร้อมกัน เหล่านี้จะถูกยืนยันแบบอะตอมิคซึ่งหมายความว่าหากมีการทำงานล้มเหลวในหนึ่งงาน งานอีกอันจะไม่สามารถประสบความสำเร็จ นี้สามารถทำได้โดยการใช้กลไกทรานแซกชันที่เคยถูกนำมาใช้แล้วในฐานข้อมูลส่วนใหญ่ ๆ มี MySQL ที่ถูกนำมาใช้ในตัวอย่างของเรา
เพื่อที่จะเก็บข้อมูลและเผยแพร่ข้อความในทรานแซกชันเดียวกัน เราต้องสามารถเผยแพร่ข้อมูลไปยัง MySQL โดยเราไม่ต้องการเปลี่ยนแปลงตัวหรือของข้อมูลทั้งระบบให้รองรับโดย MySQL เราจึงต้องหาวิธีอื่น ๆ เพื่อทำให้สามารถทำได้นี้
ข่าวดีคือ: Watermill มีเครื่องมือทั้งหมดที่จำเป็น! หากคุณใช้ฐานข้อมูลเช่น MySQL, PostgreSQL (หรือฐานข้อมูล SQL อื่น ๆ), Firestore หรือ Bolt คุณสามารถเผยแพร่ข้อความไปยังพวกเขาได้ ส่วนประกอบ Forwarder จะช่วยคุณในการเลือกข้อความทั้งหมดที่คุณเผยแพร่ไปยังฐานข้อมูลและส่งต่อไปยังตัวสตรีมของข้อมูลของคุณ
คุณต้องให้การมั่นใจว่า:
- คำสั่งของคุณใช้ตัวเผยแพร่ที่ทำงานในบริบทของทรานแซกชันของฐานข้อมูล (เช่น SQL, Firestore, Bolt).
- ส่วนประกอบ Forwarder กำลังทำงานโดยใช้ตัวติดตามของฐานข้อมูลและตัวเผยแพร่ของข้อมูลตัวสตรีม
ในกรณีนี้ คำสั่งอาจมีลักษณะดังนี้:
เพื่อให้ส่งตัวติดตาม Forwarder ให้ทำงานพร้อมและส่งตัวข้อความจาก MySQL ไปยัง Google Pub/Sub คุณต้องตั้งค่าดังนี้:
สำหรับโค้ดทั้งหมด กรุณารองไห้ที่ github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// ทำการบันทึกข้อมูลไปยัง MySQL และเผยแพร่เหตุการณ์ไปยัง Google Cloud Pub/Sub ในทรานแซกชัน
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("ย้อนกลับการทำงานเนื่องจากมีข้อผิดพลาด", watermill.LogFields{"error": err.Error()})
// ในกรณีของข้อผิดพลาด เนื่องจากการย้อนกลับของทรานแซกชันของ MySQL เราสามารถทำให้แน่ใจได้ว่า ภาวะที่ไม่ต้องการต่อไปทำซ้ำไม่เกิดขึ้นได้:
// - ข้อความถูกเผยแพร่แต่ข้อมูลไม่ถูกบันทึก,
// - ข้อมูลถูกบันทึกแต่ข้อความไม่ถูกเผยแพร่.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// ตกแต่งตัวเผยแพร่ด้วยซองไปยังบูรณะที่เข้าใจโดยส่วนประกอบของตัวที่ติดตาม
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// เผยแพร่คำโฆษณาของเหตุการณ์สรุปสุ่ม โดยที่เราเผยแพร่ไปยังหัวข้อของ Google Cloud ที่นี่โดยที่ใช้ตัวเผยแพร่ของ MySQL ที่ถูกตกแต่ง
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
หากคุณต้องการเรียนรู้เพิ่มเติมเกี่ยวกับตัวอย่างนี้ โปรดค้นหาการนำไปใช้งานของมัน [ที่นี่](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) ครับ/ค่ะ