أهمية نشر الرسائل في العمليات المالية
عند التعامل مع تطبيقات تعتمد على الأحداث، قد تكون هناك أوقات تحتاج فيها إلى الاحتفاظ بحالة التطبيق ونشر الرسائل لإبلاغ أجزاء أخرى من النظام بما حدث للتو. في scenIdeal، تريد أن تحتفظ بحالة التطبيق وتنشر رسالة dentro transacción واحدة، لأن عدم القيام بذلك يمكن أن يؤدي إلى مشاكل تتعلق بتناسق البيانات. للتزامن في الالتزام بتخزين البيانات ونشر الأحداث في عملية واحدة ، يجب أن تكون قادراً على نشر الرسائل إلى نفس قاعدة البيانات المستخدمة لتخزين البيانات، أو تنفيذ آلية الالتزام ثنائية المرحلة (2PC). إذا لم ترغب في تغيير وسيط الرسائل وتريد تبسيط عملك، يمكنك استخدام مكون Forwarder الخاص بـ Watermill!
مكون Forwarder
يمكنك أن تفكر في المكون Forwarder كـ ديمون خلفي ينتظر نشر الرسائل إلى قاعدة البيانات ويضمن أن تصل في النهاية إلى وسيط الرسائل.
لجعل Forwarder جينيريكيًا وشفافًا في الاستخدام، يستمع إلى موضوع مخصص على قاعدة البيانات الوسيطة ويستخدم ناشر Forwarder مزخرف لإرسال الرسائل المغلفة. يقوم Forwarder بفك تشفيرها وإرسالها إلى الموضوع المستهدف المحدد على وسيط الرسائل.
مثال
دعنا نأخذ المثال التالي: هناك أمر مسؤول عن إجراء سحب اليانصيب. يجب عليه اختيار مستخدم مسجل في النظام بشكل عشوائي كفائز. خلال ذلك، يجب أيضًا أن يحتفظ بقراره من خلال تخزين إدخال قاعدة البيانات يربط معرف اليانصيب الفريد بمعرف المستخدم المحدد. بالإضافة إلى ذلك، كنظام محرك الأحداث، يجب أن ينشر أيضًا حدث LotteryConcluded
، بحيث يمكن لأجزاء أخرى ردود الفعل بشكل مناسب. ليكون دقيقًا - سيكون هناك مكون مسؤول عن إرسال الجوائز إلى الفائز في اليانصيب. سيستلم حدث LotteryConcluded
ويتحقق من إدخال قاعدة البيانات المرتبط باستخدام معرف اليانصيب المدمج لتحديد الفائز.
في سيناريونا، قاعدة البيانات هي MySQL ووسيط الرسائل هو Google Pub/Sub، ولكن يمكن أن تكون أي تقنيات أخرى.
عند تنفيذ مثل هذا الأمر، يمكن اتخاذ مقااربات مختلفة. فيما يلي، سنقدم ثلاث محاولات ممكنة ونشير إلى نقاط ضعفها.
قم بنشر الحدث أولاً، ثم قم بتخزين البيانات
في هذا النهج، يتم نشر الأمر أولاً ثم يتم تخزين البيانات. على الرغم من أن هذا الأسلوب قد يعمل بشكل صحيح في معظم الحالات، دعونا نحاول تحديد المشاكل المحتملة.
يحتاج الأمر إلى تنفيذ ثلاث عمليات أساسية:
- اختيار مستخدم عشوائي
A
كفائز. - نشر حدث
LotteryConcluded
لإعلام أن اليانصيبB
قد انتهى. - تخزين في قاعدة البيانات أن اليانصيب
B
قد فاز به مستخدمA
.
كل خطوة عرضة للفشل، مما يمكن أن يعطل تدفق أمرنا. إذا فشلت الخطوة الأولى، فإن العواقب لن تكون خطيرة - نحتاج فقط إلى إرجاع خطأ والنظر في أن الأمر بأكمله فشل. لن يتم تخزين أي بيانات، ولن يتم نشر أي حدث. يمكننا ببساطة إعادة تشغيل الأمر.
إذا فشلت الخطوة الثانية، فلا يزال لم يتم نشر الحدث أو تخزين البيانات في قاعدة البيانات. يمكننا إعادة تشغيل الأمر والمحاولة مرة أخرى.
السيناريو الأكثر إثارة للاهتمام هو ما يحدث إذا فشلت الخطوة الثالثة. بعد الخطوة الثانية، لقد نشرنا بالفعل الحدث، ولكن في النهاية لن يتم تخزين أي بيانات في قاعدة البيانات. سيتلقى المكونات الأخرى الإشارة بأن اليانصيب قد انتهى، ولكن لن يكون هناك فائز مرتبط بمعرف اليانصيب الذي تم إرساله في الحدث. لن يتمكنوا من التحقق من الفائز، لذا يجب أيضًا اعتبار عملياتهم كفاشلة.
لا يزال بإمكاننا الخروج من هذا الوضع، ولكن قد يتطلب بعض العمليات اليدوية، مثل إعادة تشغيل الأمر باستخدام معرف اليانصيب من الحدث الذي تم إرساله بالفعل.
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. قم بنشر الحدث في Google Cloud Pub/Sub أولاً، ثم قم بتخزين البيانات في MySQL
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// إذا كان هناك خطأ، فقد تم إرسال الحدث ولكن لم يتم حفظ البيانات بعد.
if err = simulateError(); err != nil {
logger.Error("فشل في حفظ البيانات", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
قم بتخزين البيانات أولاً ثم نشر الحدث
في النهج الثاني، سنحاول معالجة نقاط الضعف في طريقة "العنوان أولاً". من أجل منع تسرب حالة الفشل إلى المكونات الخارجية عند عدم تخزين الحالة بشكل صحيح في قاعدة البيانات وعدم نشر الأحداث، سنقوم بتغيير ترتيب الإجراءات على النحو التالي:
- تحديد مستخدم عشوائي "A" كفائز باليانصيب "B".
- الاحتفاظ بالمعلومات التي فازت اليانصيب "B" بواسطة المستخدم "A" في قاعدة البيانات.
- نشر حدث
LotteryConcluded
، لإبلاغ أن اليانصيب "B" قد انتهى.
مماثل للنهج الأول، إذا فشلت الإجراءات الأوليتين، فلن يكون لدينا عواقب. في حال فشل الإجراء الثالث، سيتم الاحتفاظ ببياناتنا في قاعدة البيانات، لكن لن يتم نشر أي حدث. في هذه الحالة، لن نكشف عن حالة الفشل للمكونات خارج اليانصيب. ومع ذلك، ونظراً للسلوك المتوقع للنظام، فإن فائزنا لن يتمكن من استلام الجائزة لأنه لم يتم تمرير أي حدث إلى المكون المسؤول عن هذا الإجراء.
يمكن أيضًا حل هذه المشكلة يدويًا، أي يدويًا بنشر الحدث. لكن يمكننا القيام بذلك بشكل أفضل.
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. الاحتفاظ بالبيانات في MySQL أولاً، ثم نشر الحدث مباشرة إلى Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// إذا فشل هذا، فإن بياناتنا قد تم الاحتفاظ بها بالفعل، لكن لم يتم نشر أي حدث.
if err = simulateError(); err != nil {
logger.Error("فشل في نشر الحدث", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
تخزين البيانات ونشر الأحداث في عملية واحدة
تخيل أن أمرنا يمكنه أن ينفذ المهام الثانية والثالثة في نفس الوقت. سيتم تنفيذهما بشكل ذري، مما يعني أنه إذا فشلت إحدى المهام، فإن الأخرى لا يمكن أن تنجح. يمكن تحقيق ذلك من خلال استغلال آلية العمليات النقدية المنفذة بالفعل في معظم قواعد البيانات. وأحد هذه القواعد هو MySQL المستخدم في مثالنا.
من أجل تخزين البيانات ونشر الأحداث في عملية واحدة، نحتاج إلى القدرة على نشر الرسائل إلى MySQL. نظرًا لأننا لا نرغب في تغيير وسيط الرسائل ليتم دعم MySQL في جميع أنحاء النظام بأكمله، فيجب أن نجد طرقًا أخرى لتحقيق ذلك.
الخبر الجيد هو أن Watermill يوفر جميع الأدوات اللازمة! إذا كنت تستخدم قاعدة بيانات مثل MySQL، PostgreSQL (أو أي قاعدة بيانات SQL أخرى)، Firestore، أو Bolt، يمكنك نشر الرسائل إليها. سيُساعدك مكون Forwarder في تحديد جميع الرسائل التي تقوم بنشرها إلى قاعدة البيانات وإعادة توجيهها إلى وسيط الرسائل الخاص بك.
عليك أن تضمن:
- استخدام أمرك لناشر يعمل في سياق عملية قاعدة بيانات (مثل SQL، Firestore، Bolt).
- تشغيل مكون Forwarder باستخدام مشترك قاعدة بيانات وناشر وسيط رسائل.
في هذه الحالة، قد يكون شكل الأمر كما يلي:
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
لتمكين مكون Forwarder من العمل في الخلفية وإعادة توجيه الرسائل من MySQL إلى Google Pub/Sub، يجب أن تقوم بإعداده على النحو التالي:
للحصول على الشيفرة الكاملة، يُرجى الرجوع إلى github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// التخزين الدائم للبيانات في MySQL ونشر الأحداث إلى Google Cloud Pub/Sub في عملية واحدة.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("التراجع عن العملية بسبب حدوث خطأ", watermill.LogFields{"error": err.Error()})
// في حالة الخطأ، نظرًا لتراجع عملية MySQL transaction، يمكننا التأكد من عدم حدوث السيناريوهات غير المرغوب فيها التالية:
// - تم نشر الحدث ولم يتم التصديق على البيانات،
// - تم التصديق على البيانات ولكن لم يتم نشر الحدث.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// تزيين الناشر بظرف فهمه من قِبل مكون الإعادة التوجيه.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// نشر إعلان عن انتهاء سحب اليانصيب. يرجى ملاحظة أننا نقوم بنشره إلى موضوع Google Cloud هنا باستخدام الناشر MySQL المزين.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
إذا كنت ترغب في معرفة المزيد حول هذا المثال، يرجى العثور على تنفيذه [هنا](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).