प्रकाशक और सदस्य Watermill के निचले स्तरीय भाग हैं। व्यावसायिक अनुप्रयोगों में, आमतौर पर आपको संघबंधन, मैट्रिक्स, पॉयसन संदेश कतार, पुन: प्रयास, दर की सीमिता, आदि जैसे उच्च स्तरीय इंटरफेस और कार्यों का उपयोग करना चाहिए।

कभी-कभी, आपको सफल प्रसंस्करण होने पर पुनः प्रकारिति नहीं करना चाहिए। कई बार, आपको यह चाहिए कि एक संदेश को प्रसंस्कृत करने के बाद एक और संदेश भेजना हो।

इन आवश्यकताओं को पूरा करने के लिए, राउटर नामक एक घटक होता है।

Watermill Router

##विन्यास

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout हैंडलरों की बंद करते समय राउटर को कितनी देर काम करनी चाहिए, यह तय करता है।
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate यह जांचता है कि राउटर कॉन्फ़िगरेशन में कोई त्रुटियां हैं या नहीं।
func (c RouterConfig) Validate() error {
	return nil
}
// ...

हैंडलर

सबसे पहले, आपको HandlerFunc फ़ंक्शन को लागू करना होगा:

सोर्स कोड पूरा करें: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc फ़ंक्शन वह फ़ंक्शन है जो जब एक संदेश प्राप्त होता है तो बुलाया जाता है।
// 
// जब HandlerFunc त्रुटि नहीं वापस करता है, तो msg.Ack() स्वचालित रूप से बुलाया जाएगा।
// 
// जब HandlerFunc त्रुटि वापस करता है, तो msg.Nack() बुलाया जाएगा।
// 
// जब Handler में msg.Ack() को बुलाया जाता है और HandlerFunc त्रुटि वापस करता है,
// msg.Nack() भेजा नहीं जाएगा क्योंकि पहले से ही Ack भेज दिया गया है।
// 
// जब एक से अधिक संदेश प्राप्त किए जाते हैं (msg.Ack() को HandlerFunc में भेज दिया जाने के कारण या Subscriber में कई उपभोक्ताओं का समर्थन करने के कारण),
// HandlerFunc को समकालिक रूप से कार्यान्वित किया जाएगा।
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

अब आपको Router.AddHandler का उपयोग करके एक नया हैंडलर जोड़ना होगा:

पूरा सोर्स कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler एक नया हैंडलर जोड़ता है।

// handlerName को अद्वितीय होना चाहिए। वर्तमान में, यह केवल डीबगिंग के लिए उपयोग होता है।

// subscribeTopic में हैंडलर को संदेश मिलेंगे।

// publishTopic में हैंडलर द्वारा वापस मिले संदेश रूटर द्वारा उत्पन्न किए जाएंगे।

// जब हैंडलर को विभिन्न विषयों पर प्रकाशित करने की आवश्यकता होती है,

// सिफारिश की जाती है केवल प्रकाशक को हैंडलर में इंजेक्ट करना या मिडलवेयर का अनुमान लगाना,

// जो मेटाडेटा पर आधारित संदेशों को पकड़ सकता है और विशिष्ट विषयों पर प्रकाशित कर सकता है।

// यदि रूटर पहले से ही चल रहा है और एक नया हैंडलर जोड़ा जाता है, तो RunHandlers() को स्पष्ट रूप से कॉल करना चाहिए।

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("हैंडलर जोड़ रहा है", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler एक नया हैंडलर जोड़ता है।

// इस हैंडलर को संदेश वापस नहीं कर सकता है।

// जब यह एक संदेश वापस करता है, तो एक त्रुटि हो जाती है और एक Nack भेजा जाता है।

//

// handlerName को अद्वितीय होना चाहिए। वर्तमान में, यह केवल डीबगिंग के लिए उपयोग होता है।

// subscribeTopic में हैंडलर को संदेश मिलेंगे।

// subscriber एक सब्सक्राइबर है जो संदेश को सेवन करने के लिए उपयोग किया जाता है।

// यदि रूटर पहले से ही चल रहा है और एक नया हैंडलर जोड़ा जाता है, तो RunHandlers() को स्पष्ट रूप से कॉल करना चाहिए।

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

"प्रारंभ होनेवाला" में उपयोग के उदाहरण का संदर्भ दें। पूरा सोर्स कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler एक हैंडलर लौटाता है जो हैंडलर स्तरीय मिडलवेयर जोड़ने या हैंडलरों को रोकने के लिए उपयोग किया जा सकता है।
	handler := router.AddHandler(
		"struct_handler",          // हैंडलर नाम, यह अनुप्रयोग के लिए विशिष्ट होना चाहिए
		"incoming_messages_topic", // विषय जिससे घटनाएँ पढ़ी जाती हैं
		pubSub,
		"outgoing_messages_topic", // घटनाएँ प्रकाशित करने का विषय
		pubSub,
		structHandler{}.Handler,
	)

	// हैंडलर स्तरीय मिडलवेयर केवल विशिष्ट हैंडलरों के लिए कार्यान्वित होता है
	// इस मिडलवेयर को राउटर स्तरीय मिडलवेयर की तरह ही जोड़ा जा सकता है
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("हैंडलर-विशिष्ट मिडलवेयर को क्रियान्वित किया जा रहा है, संदेश UUID: ", message.UUID)

			return h(message)
		}
	})
// ...

कोई प्रकाशक हैंडलर

हर हैंडलर एक नया संदेश उत्पन्न नहीं करेगा। आप Router.AddNoPublisherHandler का उपयोग इस प्रकार के हैंडलर जोड़ने के लिए कर सकते हैं:

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler एक नया हैंडलर जोड़ता है।
// इस हैंडलर से संदेश प्राप्त नहीं किया जा सकता।
// जब यह एक संदेश वापस करता है, तो त्रुटि होगी और Nack भेजा जाएगा।
//
// handlerName को अद्वितीय बनाना चाहिए और वर्तमान में केवल डिबगिंग के उद्देश्यों के लिए उपयोग किया जाता है।
//
// subscribeTopic वह विषय है जिस पर हैंडलर संदेश प्राप्त करेगा।
//
// subscriber का उपयोग संदेश का सेवन करने के लिए किया जाता है।
//
// यदि आप एक हैंडलर को वह पहले से ही चल रहे रूटर में जोड़ते हैं, तो आपको स्पष्ट रूप से RunHandlers() को कॉल करना होगा।
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

स्वीकृति

डिफ़ॉल्ट रूप में, जब HanderFunc कोई त्रुटि नहीं वापस करता है, msg.Ack() को कॉल किया जाएगा। अगर एक त्रुटि वापस की जाती है, तो msg.Nack() को कॉल किया जाएगा। इसलिए, संदेश का संबोधन करने के बाद, आपको msg.Ack() या msg.Nack को कॉल करने की आवश्यकता नहीं है (यहां तक कि आप चाहें तो कर सकते हैं)।

संदेश उत्पन्न करना

जब हैंडलर द्वारा कई संदेश वापस किए जाते हैं, तो कृपया नोट करें कि अधिकांश प्रकार के प्रकाशक कार्यान्वयन संदेश का परमाणुक तैनाती समर्थन नहीं करते हैं। यदि ब्रोकर या स्टोरेज अनुपलब्ध है, तो केवल कुछ संदेश उत्पन्न हो सकते हैं और msg.Nack() भेजा जाएगा।

यदि यह समस्या है, तो प्रत्येक हैंडलर केवल एक संदेश प्रकाशित करने का विचार करें।

राउटर चलाना

राउटर को चलाने के लिए, आपको Run() को कॉल करने की आवश्यकता होगी।

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run सभी प्लगइन्स और हैंडलर को चलाता है, और दिए गए विषयों की सदस्यता लेना शुरू करता है।
// इस कॉल के दौरान राउटर चल रहा होता है।
//
// जब सभी हैंडलर बंद हो जाते हैं (उदाहरण के लिए क्योंकि सदस्यता बंद कर दी गई है), तो राउटर भी बंद हो जाएगा।
//
// Run() को बंद करने के लिए, आपको राउटर पर Close() कॉल करना चाहिए।
//
// ctx को सभी सदस्यों तक पहुंचाया जाएगा।
//
// जब सभी हैंडलर बंद हो जाते हैं (उदाज़रण: बंद कनेक्शन के कारण), Run() भी बंद हो जाएगा।
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

सुनिश्चित करें कि राउटर चल रहा है

राउटर के चल रहे होने की समझ अच्छी हो सकती है। आप इसे Running() मेथड का उपयोग करके प्राप्त कर सकते हैं।

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running राउटर चल रहे हैं जब बंद होते हैं।
// अर्थात, आप इस रूप में रोक सकते हैं कि कि राउटर चल रहां है:
// 
// 	fmt.Println("राउटर शुरू कर रहा है")
//	go r.Run(ctx)
//	//	fmt.Println("राउटर चल रहा है")
//
// चेतावनी: ऐतिहासिक कारणों के लिए, इस चैनल को राउटर के बंद होने के बारे में पता नहीं है - अगर राउटर चलता रहता है और फिर बंद होता है तो यह बंद हो जाएगा।
func (r *Router) Running() chan struct{} {
// ...
}

आप IsRunning फ़ंक्शन का भी उपयोग कर सकते हैं जो बूलियन मान वापस करता है:

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning राउटर चल रहा है जब बंद होते हैं।
//
// चेतावनी: ऐतिहासिक कारणों के लिए, इस मेथड को राउटर के बंद होने की स्थिति के बारे में पता नहीं होता है।
// अगर आप जानना चाहते हैं कि क्या राउटर बंद हो गया है, तो IsClosed का उपयोग करें।
func (r *Router) IsRunning() bool {
// ...
}

राउटर बंद करें

राउटर बंद करने के लिए, आपको Close() को बुलाना होगा।

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() सभी प्रकाशकों और सब्सक्राइबर्स को बंद करेगा, और सभी हैंडलर्स को पूरा होने का इंतजार करेगा।

Close() विन्यास में RouterConfig.CloseTimeout में सेट किए गए टाइमआउट के लिए प्रतीक्षा करेगा। अगर समय सीमा तक पहुंच जाता है, तो Close() एक त्रुटि लौटाएगा।

राउटर चालू होने के बाद हैंडलर जोड़ना

आप राउटर चल रहा होने पर एक नया हैंडलर जोड़ सकते हैं। इसके लिए, आपको AddNoPublisherHandler या AddHandler को बुलाना होगा, और फिर RunHandlers को बुलाना होगा।

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

चल रहे हैंडलर्स को रोकना

आप Stop() को बुलाकर केवल एक चल रहा हैंडलर को रोक सकते हैं।

कृपया ध्यान दें कि जब कोई चल रहे हैंडलर्स नहीं होते हैं, तो राउटर बंद हो जाएगा।

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...

निष्पादन मॉडल

सब्सक्राइबर एक ही संदेश को अकेले क्रमश: का ग्रहण कर सकते हैं या पैरालेल में कई संदेशों का ग्रहण कर सकते हैं।

  • Single message flow सबसे सरल विधि है, जिसका अर्थ है कि सब्सक्राइबर्स को msg.Ack() बुलाया जाने तक कोई नया संदेश प्राप्त नहीं होता है।
  • Multiple message flow केवल कुछ सब्सक्राइबर्स द्वारा समर्थित है। मल्टीपल विषय विभाजनों को सहायकता देकर, पैरालेल में कई संदेशों का ग्रहण किया जा सकता है, यहां तक कि पूर्व में मान्यता प्राप्त नहीं होती थी (जैसे, कैसे काफ्का सब्सक्राइबर्स काम करते हैं)। राउटर इस मॉडल को HandlerFunc को पैरालेल में चलाकर प्रसंस्करण करता है।

समर्थित निष्पादन मॉडल को समझने के लिए चयनित पब/सब दस्तावेज़ीकरण का संदर्भ दें।

मिडलवेयर

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware हमें HandlerFunc के लिए एक डेकोरेटर की तरह कुछ लिखने की अनुमति देता है।
// यह कुछ कार्यों को पूर्व में (उदाहरण के लिए, ग्रहण किए गए संदेश को संशोधित करें) या हैंडलर के बाद (उत्पन्न किए गए संदेश को संशोधित करें, ग्रहण/अस्वीकृत किए गए संदेश का संभालन करें, त्रुटियाँ हैंडल करें, लॉग करें, आदि) चला सकता है।
//
// इसे राउटर को `AddMiddleware` विधि का उपयोग करके जोड़ा जा सकता है।
//
// उदाहरण:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("हैंडलर से पहले निष्पादित")
// 			utpaditMessages, err := h(message)
// 			fmt.Println("हैंडलर के बाद निष्पादित")
//
// 			utpaditMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

मिडलवेयर के पूरे सामान्य सूची को Middlewares में पा सकते हैं।

प्लगइन

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin राउटर शुरू होते ही एक फ़ंक्शन होता है जिसे निष्पादित किया जाता है।
type RouterPlugin func(*Router) error

// ...

प्लगइन की पूरी सामान्य सूची message/router/plugin में पा सकते हैं।

संदर्भ

प्रत्येक हैंडलर द्वारा प्राप्त हुए प्रत्येक संदेश के लिए कंटेक्स्ट में कुछ उपयोगी मूल्य संग्रहीत होते हैं:

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx कंटेक्स्ट से संदेश हैंडलर का नाम लौटाता है जो संदेश को कंस्यूम करता है।
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx कंटेक्स्ट से संदेश प्रकाशक के नाम को लौटाता है जो संदेश प्रकार की श्रोता से आया है।
// उदाहरण के लिए, काफ्का के लिए, यह `kafka.Publisher` होगा।
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx कंटेक्स्ट से संदेश ग्राहक के नाम को लौटाता है जो संदेश प्रकार की श्रोता से आया है।
// उदाहरण के लिए, काफ्का के लिए, यह `kafka.Subscriber` होगा।
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx कंटेक्स्ट से विषय को लौटाता है जिससे संदेश संरक्षित किया गया था वह भी कंटेक्स्ट से पूर्वानुमान द्वारा।
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx कंटेक्स्ट से विषय को लौटाता है जिस पर संदेश प्रकाशित किया जाएगा वह भी कंटेक्स्ट से पूर्वानुमान द्वारा।
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...