एक लेनदार में संदेश प्रकाशित करने का महत्व

घटना-नियंत्रित अनुप्रयोगों के साथ काम करते समय, कभी-कभी ऐसी स्थितियाँ आ सकती हैं जब आपको अनुप्रयोग स्थिति को स्थायी करने और सिस्टम के अन्य हिस्सों को यह सूचित करने के लिए संदेश प्रकाशित करने की आवश्यकता होती है। एक आदर्श स्थिति में, आप चाहेंगे कि आप अनुप्रयोग स्थिति को स्थायी करें और एक ही लेनदार में संदेश प्रकाशित करें, क्योंकि ऐसा नहीं करने से डेटा संरचना में समर्थन समस्याओं का सामना करना पड़ सकता है। डेटा भंडारण को समय पर स्थान रखने और एक से अधिक संदेश प्रकाशित करने के लिए, आपको डेटा भंडारण के लिए उपयोग की गई डेटाबेस में संदेश प्रकाशित करने की क्षमता होनी चाहिए, या फिर दो-चरणीय सम्मति (2PC) व्यवस्था को लागू करनी होगी। अगर आप संदेश ब्रोकर को डेटाबेस में बदलना नहीं चाहते और पहिया फिरसे बनाना नहीं चाहते हैं, तो आप Watermill के Forwarder component का उपयोग करके अपना काम सरल बना सकते हैं!

Forwarder Component

आप Forwarder को एक पृष्ठ की तारीक़ से सोच सकते हैं जो संदेशों को डेटाबेस में प्रकाशित होने का इंतजार करता है और सुनिश्चित करता है कि वे आखिरकार संदेश ब्रोकर तक पहुँच जाते हैं।

Forwarder को सामान्य और पारदर्शी उपयोग के लिए, इसने इंटरमीडिएट डेटाबेस पर एक विशेष विषय पर सुनवाई करने और आभूषित Forwarder प्रकाशक का उपयोग करता है। Forwarder उन्हें खोलता है और उन्हें निर्दिष्ट लक्ष्य विषय पर संदेश भेजता है।

उदाहरण

हम निम्नलिखित उदाहरण को विचार करेंगे: एक आदेश है जो लॉटरी ड्रा का प्रबंध करने के लिए उत्तरदायित्व रखता है। इसे संयुक्त रूप से पंजीकृत उपयोगकर्ता को विजेता के रूप में यादृच्छिक रूप से चुनना चाहिए। इसके दौरान, यह अपना निर्णय स्थायी रूप से रखना चाहिए और चयनित उपयोगकर्ता के आदर्श लॉटरी आईडी को जोड़ने वाले डेटाबेस एंट्री को भी स्थायी रूप से करना चाहिए। इसके अतिरिक्त, एक घटना-नियंत्रित सिस्टम के रूप में, यह भी एक LotteryConcluded घटना प्रकाशित करना चाहिए, ताकि अन्य प्रभाग संयुक्त रूप से प्रतिक्रिया कर सकें। स्पष्टता के लिए - लॉटरी विजेता को इनाम भेजने के लिए एक प्रभाग जिम्मेदार होगा। यह LotteryConcluded घटना प्राप्त करेगा और डेटाबेस एंट्री का उपयोग करके निर्धारित करेगा कि विजेता कौन है।

हमारी स्थिति में, डेटाबेस MySQL है और संदेश ब्रोकर Google Pub/Sub है, लेकिन यह किसी अन्य दो तकनीकों का भी हो सकता है।

ऐसे एक आदेश को लागू करते समय, विभिन्न दृष्टिकोण अपनाए जा सकते हैं। इसके बाद, हम तीन संभावित प्रयास पेश करेंगे और उनकी कमियों पर ध्यान देंगे।

पहले घटना प्रकाशित करें, फिर डेटा संग्रह करें

इस दृष्टिकोण में, कमांड पहले एक घटना प्रकाशित करता है और फिर डेटा संग्रहित करता है। हालांकि, इस विधि काफी संदर्भों में सही रूप से काम कर सकती है, लेकिन हमें संभावित समस्याओं की पहचान करने का प्रयास करना चाहिए।

कमांड को तीन मौलिक परिचालन करने की आवश्यकता है:

  1. एक यादृच्छिक उपयोगकर्ता A को विजेता के रूप में चुनना।
  2. एक LotteryConcluded घटना प्रकाशित करना जिससे सूचित हो कि लॉटरी B समाप्त हो गई है।
  3. डेटाबेस में संग्रहीत करना कि लॉटरी B का विजेता उपयोगकर्ता A द्वारा जीत गई है।

प्रत्येक कदम विफलता के लिए अवसरशील है, जो हमारे कमांड फ्लो को विघटित कर सकता है। यदि पहला कदम विफल होता है, तो परिणाम गंभीर नहीं होंगे - हमें बस एक त्रुटि लौटानी होगी और पूरे कमांड को विफल मानना होगा। कोई डेटा संग्रहित नहीं किया जाएगा, और कोई घटना प्रकाशित नहीं की जाएगी। हम बस कमांड को फिर से चला सकते हैं।

यदि दूसरा कदम विफल होता है, तो हमने अभी तक घटना प्रकाशित नहीं की है और न ही डेटाबेस में डेटा संग्रहित किया है। हम कमांड को फिर से चला सकते हैं और पुन: प्रयास कर सकते हैं।

सबसे दिलचस्प परिस्थिति यह है कि तीसरा कदम विफल होने पर क्या होता है। दूसरे कदम के बाद, हमने पहले से ही घटना प्रकाशित कर दी है, लेकिन अंत में कोई डेटा डेटाबेस में संग्रहित नहीं होगा। अन्य घटक सिग्नल प्राप्त करेंगे कि लॉटरी समाप्त हो गई है, लेकिन घटना में भेजे गए लॉटरी आईडी से कोई विजेता नहीं होगा। वे विजेता की पुष्टि नहीं कर पाएंगे, इसलिए उनके कार्य भी विफल माने जाने चाहिए।

हम इस स्थिति से अभी भी बाहर निकल सकते हैं, लेकिन इसे कुछ मैनुअल कार्यों की आवश्यकता हो सकती है, जैसे कि पहले से भेजी गई घटना के लॉटरी आईडी का उपयोग करके कमांड को फिर से चलाना।

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. पहले Google Cloud Pub/Sub पर घटना प्रकाशित करें, फिर MySQL में डेटा संग्रहीत करें।
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// यदि कोई त्रुटि है, तो घटना भेजी गई है लेकिन डेटा अभी तक सहेजा नहीं गया है।
	if err = simulateError(); err != nil {
		logger.Error("डेटा संग्रहण में विफल", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

पहले डेटा स्टोर करें, फिर इवेंट प्रकाशित करें

दूसरी दृष्टि कोण में, हम पहले दृष्टि कोण की कमियों का समाधान करने का प्रयास करेंगे। डेटाबेस में स्थिति सही रूप से स्थायित न होने और इवेंट्स प्रकाशित न होने के समय असफलता को बाहरी कॉम्पोनेंट्स को लीक नहीं होने देने के लिए, हम क्रियाओं का क्रम निम्नलिखित रूप में बदलेंगे:

  1. यूजर A को बिना चयन किए लॉटरी B के विजेता के रूप में चुनें।
  2. जानकारी को स्थायित करें कि लॉटरी B को यूजर A ने जीता है डेटाबेस में।
  3. LotteryConcluded इवेंट प्रकाशित करें, जिसमें सूचित किया जाए कि लॉटरी B समाप्त हो चुकी है।

पहले दृष्टि के उपाय की तरह, अगर पहले दो क्रियाएँ असफल होती हैं, तो हमारे पास कोई परिणाम नहीं हैं। तीसरी क्रिया के असफल हो जाने की स्थिति में, हमारे डेटा डेटाबेस में स्थायित हो जाएगा, लेकिन कोई इवेंट प्रकाशित नहीं होगा। इस मामले में, हम लॉटरी के बाहरी कॉम्पोनेंट्स को असफलता की स्थिति लीक नहीं करेंगे। लेकिन, अपेक्षित सिस्टम व्यवहार को ध्यान में रखते हुए, हमारा विजेता पुरस्कार प्राप्त नहीं कर पाएगा क्योंकि कोई इवेंट इस क्रिया के लिए जिम्मेदार कंपोनेंट को पारित नहीं किया जाएगा।

इस समस्या का हल भी मैन्युअल क्रियादिवस से कर दिया जा सकता है, अर्थात, इवेंट को मैन्युअल रूप से प्रकाशित करके। लेकिन हम बेहतर कर सकते हैं।

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. पहले डेटा को MySQL में स्थायित करें, और फिर सीधे इवेंट को Google Cloud Pub/Sub में प्रकाशित करें।
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// यदि यह असफल होता है, तो हमारा डेटा पहले से ही स्थायित हो गया है, लेकिन कोई इवेंट प्रकाशित नहीं हुआ है।
	if err = simulateError(); err != nil {
		logger.Error("इवेंट प्रकाशित करने में विफल", err, nil)
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

सूचना रखना और घटनाएँ लेखित परिस्थितियों में प्रकाशित करना

सोचिए कि हमारे कमांड समय पर दूसरा और तीसरा कार्य एक साथ कर सकता है। वे परमाणुरूप से प्रतिबद्ध किए जाएंगे, जिसका अर्थ है कि अगर एक कार्य विफल होता है, तो दूसरा सफल नहीं हो सकता। यह सभी डेटाबेस में पहले से ही लागू किए गए लेनदार मेकेनिज्म का उपयोग करके प्राप्त किया जा सकता है। हमारे उदाहरण में उपयोग किया गया MySQL इसमें से एक है।

डेटा संग्रहण और घटनाएँ एक ही लेनदार में संग्रहीत करने के लिए, हमें MySQL में संदेश प्रकाशित करने की क्षमता होनी चाहिए। हमें पूरे सिस्टम के लिए समर्थित होने के लिए संदेश ब्रोकर को MySQL द्वारा समर्थित करना नहीं चाहिए। इसे प्राप्त करने के अन्य तरीके ढूँढने की आवश्यकता है।

अच्छी खबर यह है: Watermill सभी आवश्यक साधन प्रदान करता है! यदि आप MySQL, PostgreSQL (या किसी अन्य SQL डेटाबेस), Firestore, या Bolt जैसा डेटाबेस उपयोग कर रहे हैं, तो आप उन्हें संदेश प्रकाशित कर सकते हैं। Forwarder घटक आपको उन सभी संदेशों का चयन करने में मदद करेगा जो आप डेटाबेस में प्रकाशित करते हैं और उन्हें आपके संदेश ब्रोकर को आगे बढ़ाता है।

आपको सुनिश्चित करना होगा कि:

  1. आपका कमांड एक पब्लिशर का उपयोग डेटाबेस लेनदार कार्यालय के सन्दर्भ में करता है (जैसे SQL, Firestore, Bolt)।
  2. Forwarder घटक चल रहा है, डेटाबेस सब्सक्राइबर और संदेश ब्रोकर पब्लिशर का उपयोग करता है।

इस मामले में, कमांड इस तरह दिख सकता है:

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Forwarder घटक को पीछे से काम करने और MySQL से Google Pub/Sub को संदेश अग्रक्षित करने के लिए, आपको निम्नलिखित रूप में सेट करना होगा:

पूरे कोड के लिए, कृपया देखें github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// डेटा को MySQL में संग्रहित करें और घटनाएँ Google Cloud Pub/Sub में प्रकाशित करें एक लेनदार कार्यालय में।
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("त्राणसक्षम्ता को वापस लेने का निर्णय एक त्रुटि के कारण", watermill.LogFields{"error": err.Error()})
			// त्रुटि की स्थिति में, MySQL लेनदार कार्यालय के द्वारा वापस ले जाने के मामले में, हम सुनिश्चित कर सकते हैं कि निम्नलिखित अवांछित परिस्थितियों का निराकरण नहीं होता है:
			// - घटना प्रकाशित हो गई लेकिन डेटा संग्रहित नहीं हुआ,
			// - डेटा संग्रहित हो गया लेकिन घटना प्रकाशित नहीं हुई।
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// प्रकाशक को एक ऐसे लिफाफे के साथ बनावट दें जिसे फॉरवर्डर घटक द्वारा समझा जा सकता है।
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// लॉटरी समाप्ति घटना का एक संकेत प्रकाशित करें। कृपया ध्यान दें कि हम यहाँ एक Google Cloud विषय को प्रकाशित करते हैं जो सजीव MySQL प्रकाशक का उपयोग करते हुए।
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
यदि आप इस उदाहरण के बारे में और अधिक जानना चाहते हैं, तो कृपया इसके अंमलण को [यहाँ](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) खोजें।