اهمیت انتشار پیامها در یک تراکنش
در برخورد با برنامههای مبتنی بر رویداد، گاهی اوقات ممکن است نیاز داشته باشید که وضعیت برنامه را پایدار نگه دارید و پیامها را منتشر کنید تا بخشهای دیگر سیستم را در جریان اتفاقات قرار دهید. در یک سناریو آرمانی، شما میخواهید وضعیت برنامه را پایدار نگه دارید و یک پیام را در یک تراکنش تکیه کن که در صورت عدم انجام این فعالیتها، باعث مشکلات همگرایی داده میشود. برای همزمانی انجام دادههای ذخیرهسازی و انتشار رویدادها در یک تراکنش، شما باید بتوانید پیامها را به همان پایگاهدادهای که برای ذخیرهسازی داده استفاده میکنید، منتشر کنید یا یک مکانیزم دومیتباک (2PC) پیادهسازی کنید. اگر نمیخواهید واسط پیام را به پایگاه داده تغییر دهید و نمیخواهید چرخه را اختراع کنید، میتوانید کار خود را با استفاده از اجزای فرستنده از Watermill، سادهتر کنید!
اجزای فرستنده
میتوانید از فرستنده به عنوان یک دیمون پسزمینه فکر کنید که منتظر پیامها برای انتشار در پایگاه داده است و اطمینان حاصل میکند که در نهایت آنها به وسیله واسط پیام میرسد.
برای جعل فرستنده عمومی و شفاف برای استفاده، آن بر روی یک موضوع اختصاصی در پایگاه داده میشنود و از یک پخشکننده فرستنده تزئین شده استفاده میکند تا پیامهای کپسولهشده را ارسال کند. فرستنده آنها را باز کرده و به موضوع مقصد مشخص در واسط پیام ارسال میکند.
مثال
بیایید مثال زیر را در نظر بگیریم: یک دستور وجود دارد که مسئول انجام قرعهکشی است. باید به طور تصادفی یک کاربر ثبتنام شده در سیستم را به عنوان برنده انتخاب کند. همراه با انجام این کار، باید تصمیم خود را با ذخیره یک ورودی پایگاه داده که با شناسه یکتای قرعهکشی با شناسه کاربر انتخابشده مرتبط است، ثابت کند. علاوه بر این، به عنوان یک سیستم مبتنی بر رویداد، باید همچنین یک رویداد قرعهکشیپایانیافته
را منتشر کند، تا سایر اجزا بتوانند بهشکل مناسب واکنش نشان دهند. برای دقت - یک جزء مسئول برای ارسال جوایز به برنده قرعهکشی وجود دارد. این جزء رویداد قرعهکشیپایانیافته
را دریافت خواهد کرد و با استفاده از ورودی پایگاه داده با استفاده از شناسه یکتای قرعهکشی مطابقت خواهد داد و برنده را تعیین میکند.
در سناریو ما، پایگاهداده MySQL است و واسط پیام Google Pub/Sub است، اما ممکن است دو فناوری دیگر نیز باشد.
در هنگام پیادهسازی چنین دستوری، میتوان به روشهای مختلفی روی آورد. در ادامه، سه تلاش ممکن را معرفی خواهیم کرد و نقاط ضعف آنها را مشخص خواهیم کرد.
ابتدا رویداد منتشر شود، سپس دادهها ذخیره شوند
در این رویکرد، دستور ابتدا یک رویداد را منتشر میکند و سپس دادهها را ذخیره میکند. اگرچه این روش بیشتر مواقع به درستی کار میکند، اما بیایید سعی کنیم مشکلات احتمالی را شناسایی کنیم.
دستور باید سه عملیات اصلی را انجام دهد:
- انتخاب یک کاربر تصادفی به نام
A
به عنوان برنده. - انتشار رویداد
LotteryConcluded
برای اطلاع رسانی اتمام قرعه کشیB
. - ذخیره در پایگاه داده بر این که قرعه کشی
B
توسط کاربرA
برنده شده است.
هر یک از این مراحل در معرض شکست قرار دارند که میتواند جریان دستور را به هم بزند. اگر مرحله اول شکست بخورد، پیامدها جدی نخواهد داشت - ما فقط باید یک خطا را برگردانی کرده و تصور کنیم که کل دستور به عنوان شکست شناخته شده است. هیچ دادهای ذخیره نخواهد شد و هیچ رویدادی منتشر نخواهد شد. ما میتوانیم به سادگی دستور را دوباره اجرا کنیم.
اگر مرحله دوم شکست خورد، هنوز هیچ رویدادی منتشر نکردهایم یا داده را در پایگاه داده ذخیره نکردهایم. ما میتوانیم دستور را دوباره اجرا کرده و دوباره امتحان کنیم.
جالبترین سناریو آن است که اگر مرحله سوم شکست خورد چه اتفاقی میافتد. پس از مرحله دوم، ما قبلاً رویداد را منتشر کردهایم، اما در نهایت هیچ دادهای در پایگاه داده ذخیره نخواهد شد. سایر اجزا چشمهای شان را خواهند کرد که قرعه کشی به اتمام رسیده است، اما هیچ برندهای با شناسه قرعه کشی ارسال شده در رویداد مرتبط نخواهد بود. آنها قادر به تأیید برنده نخواهند بود، بنابراین عملیات آنها هم باید به عنوان شکست در نظر گرفته شود.
ما هنوز میتوانیم از این وضعیت خارج شویم، اما ممکن است نیاز به برخی عملیات دستی داشته باشیم، مانند دوباره اجرای دستور با استفاده از شناسه قرعه کشی از رویدادی که قبلاً ارسال شده است.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. ابتدا رویداد را به Google Cloud Pub/Sub منتشر کرده و سپس دادهها را در MySQL ذخیره میکند.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// اگر خطا وجود دارد، رویداد ارسال شده اما داده هنوز ذخیره نشده است.
if err = simulateError(); err != nil {
logger.Error("Failed to persist data", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
ابتدا دادهها را ذخیره کنید، سپس رویداد را منتشر کنید
در رویکرد دوم، ما سعی میکنیم به نقاط ضعف روش اول پرداخته و اقدام به رفع آنها کنیم. به منظور جلوگیری از نشتن وضعیت خطا به اجزای خارجی زمانی که وضعیت به درستی در پایگاه داده ذخیره نشده و رویدادها منتشر نشدهاند، ما ترتیب اقدامات را بهصورت زیر تغییر خواهیم داد:
- انتخاب تصادفی کاربر
A
به عنوان برنده قرعهکشیB
. - ذخیره اطلاعات برنده شدن کاربر
A
در قرعهکشیB
در پایگاه داده. - منتشر کردن رویداد
LotteryConcluded
که اعلام میکند قرعهکشیB
به اتمام رسیده است.
مشابه رویکرد اول، اگر دو اقدام اول شکست بخورند، هیچ پیامدی نخواهیم داشت. در صورت شکست سومین اقدام، دادههای ما در پایگاه داده ذخیره خواهد شد، اما هیچ رویدادی منتشر نخواهد شد. در این حالت، ما وضعیت خرابی را به اجزا خارج از قرعهکشی نشان نخواهیم داد. با این وجود، با توجه به رفتار مورد انتظار سیستم، برنده ما قادر به دریافت جایزه نخواهد بود زیرا هیچ رویدادی برای اجرای این عمل به مسئول مربوطه منتقل نشده است.
این مسئله همچنین میتوانست با عملیات دستی حل شود، به عنوان مثال، به صورت دستی رویداد را منتشر کنیم. اما ما میتوانیم بهتر عمل کنیم.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. ابتدا دادهها را به MySQL ذخیره کنید، سپس بهصورت مستقیم رویداد را در Google Cloud Pub/Sub منتشر کنید.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// اگر این اقدام شکست بخورد، دادههای ما قبلاً ذخیره شده است، اما هیچ رویدادی منتشر نشده است.
if err = simulateError(); err != nil {
logger.Error("Failed to publish the event", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
ذخیره داده و انتشار رویدادها در یک تراکنش
تصور کنید که دستور ما میتواند وظایف دوم و سوم را به یکدیگر انجام دهد. آنها به صورت اتمیک تایید میشوند، به این معنی که اگر یک وظیفه شکست بخورد، دیگری نمیتواند موفق شود. این میتواند با بهرهگیری از مکانیسم تراکنشی که از پیش در بیشتر پایگاهدادهها پیادهسازی شدهاست، به دست آید. مثلاً MySQL که در مثال ما استفاده شدهاست، یکی از آنهاست.
برای ذخیره داده و انتشار رویدادها در یک تراکنش واحد، ما باید قادر به انتشار پیامها به MySQL باشیم. چون ما نمیخواهیم تا جایی که سیستم کلی را پشتیبانی کند، پیکربندی ناقص پیام را تغییر دهیم تا توسط MySQL پشتیبانی شود، باید راههای دیگری برای رسیدن به این هدف پیدا کنیم.
اخبار خوب این است که Watermill تمام ابزارهای لازم را فراهم میکند! اگر از پایگاهدادهای مانند MySQL، PostgreSQL (یا هر پایگاهداده اِسکیوال دیگری)، Firestore یا Bolt استفاده میکنید، میتوانید پیامها را به آنها ارسال کنید. کامپوننت Forwarder به شما کمک میکند تا تمام پیامهایی که شما به پایگاهداده منتشر میکنید را انتخاب کرده و آنها را به پیکربندیکننده پیام منتقل کنید.
شما باید اطمینان حاصل کنید که:
- دستور شما از یک انتشارکننده کار میکند در زمینه تراکنش پایگاهداده (مثلاً SQL، Firestore، Bolt).
- کامپوننت Forwarder در حال اجراست و از اشتراکگیرنده پایگاهداده و انتشارکننده پیامها استفاده میکند.
در این حالت، دستور ممکن است به این شکل باشد:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
برای فعال کردن کامپوننت Forwarder برای کار در پسزمینه و فوروارد کردن پیامها از MySQL به Google Pub/Sub، باید آن را به این صورت تنظیم کنید:
برای کد کامل، لطفا به github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go مراجعه کنید.
// ...
// ذخیره داده در MySQL و انتشار رویدادها به Google Cloud Pub/Sub در یک تراکنش.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("بازگردانی تراکنش به دلیل وجود خطا", watermill.LogFields{"error": err.Error()})
// در صورت خطا، به دلیل بازگردانی تراکنش MySQL، میتوانیم از وقوع سناریوهای ناخواسته زیر جلوگیری کنیم:
// - رویداد منتشر شود اما داده ذخیره نشود،
// - داده ذخیره شود اما رویداد منتشر نشود.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// دهنده انتشارکننده را با یک پوشش متوجه شده توسط کامپوننت فورواردر تزیین کنید.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// اعلان انتهایی قرعهکشی را منتشر کنید. لطفا توجه داشته باشید که ما از جانب انتشاردهنده MySQL دیکور شده به یک موضوع Google Cloud منتشر میکنیم.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
اگر می خواهید بیشتر در مورد این مثال بدانید، لطفاً پیادهسازی آن را [اینجا](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) پیدا کنید.