परिचय
मिडलवेयर का उपयोग ईवेंट फ्रेमवर्क को विस्तारित करने, कस्टम फ़ंक्शनैलिटी जोड़ने, और मुख्य हैंडलर के तार्किकता से असंबंधित महत्वपूर्ण फ़ंक्शनैलिटी प्रदान करने के लिए किया जाता है। उदाहरण के लिए, त्रुटि वापस लौटने के बाद हैंडलर को पुनः प्रयास करना, या पैनिक से बचाव करना और हैंडलर के भीतर स्टैक ट्रेस को कैप्चर करना।
मिडलवेयर फ़ंक्शन सिग्नेचर निम्नलिखित रूप में परिभाषित की जाती है:
पूर्ण सोर्स कोड: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware हमें हैंडलर के समान डेकोरेटर लिखने की अनुमति देता है।
// यह कुछ संचालन कर सकता है हैंडलर से पहले(जैसे, संवाहित संदेश को संशोधित करें)
// और हैंडलर के बाद भी कुछ संचालन कर सकता है (निर्मित संदेश को संशोधित करें, संवाहित संदेश को ACK/NACK करें, त्रुटियों का सामना करें, लॉगिंग, आदि)।
//
// इसे `AddMiddleware` विधि का उपयोग करके राउटर से जोड़ा जा सकता है।
//
// उदाहरण:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("हैंडलर को निष्पादित करने से पहले")
// निर्मितसंदेश, त्रुटि := h(message)
// fmt.Println("हैंडलर को निष्पादित करने के बाद")
//
// निर्मितसंदेश, त्रुटि
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
उपयोग
मिडलवेयर राउटर में सभी हैंडलर्स के लिए या विशिष्ट हैंडलर्स के लिए लागू किया जा सकता है। जब मिडलवेयर सीधे राउटर में जोड़ा जाता है, तो यह राउटर के लिए प्रदान किए गए सभी हैंडलर्स पर लागू किया जाएगा। यदि कोई मिडलवेयर केवल एक विशिष्ट हैंडलर के लिए लागू होता है, तो इसे राउटर में हैंडलर में जोड़ा जाना चाहिए।
यहां एक उपयोग का उदाहरण है:
github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go पूर्ण स्रोत कोड:
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SIGTERM सिगनल प्राप्त करते समय, SignalsHandler धीरे से राउटर को बंद करेगा।
// आप इसे `r.Close()` को बुलाकर भी राउटर को बंद कर सकते हैं।
router.AddPlugin(plugin.SignalsHandler)
// राउटर स्तर का मिडलवेयर राउटर पर भेजे गए हर मैसेज पर चलाया जाएगा
router.AddMiddleware(
// CorrelationID आगंतुक मैसेज मेटाडेटा से संबंध आईडी की प्रतिलिपि बनाएगा
middleware.CorrelationID,
// अगर हैंडलर एक त्रुटि लौटाता है, तो इसे पुनः प्रयास किया जाएगा।
// इसे अधिकतम MaxRetries बार पुनः प्रयास किया जाएगा, जिसके बाद संदेश को Nacked किया जाएगा और PubSub द्वारा पुनः भेजा जाएगा।
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer हैंडलर में पैनिक को नियंत्रित करता है।
// इस मामले में, यह उन्हें त्रुटियों के रूप में Retry मिडलवेयर को पारित करता है।
middleware.Recoverer,
)
// सरलता के लिए, हम यहां गोचैनल पब / सब का उपयोग करते हैं,
// आप इसे किसी भी पब / सब अनुकरण के साथ बदल सकते हैं और यह वही तरीके से काम करेगा।
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// प्रायोजित कुछ आगंतुक मैसेज को पृष्ठभूमि में भेजें
go publishMessages(pubSub)
// AddHandler हैंडलर बनाता है जिसे हैंडलर स्तर के मिडलवेयर को जोड़ने या हैंडलर को बंद करने के लिए उपयोग किया जा सकता है।
handler := router.AddHandler(
"struct_handler", // हैंडलर नाम, यह अद्वितीय होना चाहिए
"incoming_messages_topic", // विषय जिससे घटनाएं पढ़ी जाएंगी
pubSub,
"outgoing_messages_topic", // विषय जिस पर घटनाएं प्रकाशित की जाएंगी
pubSub,
structHandler{}.Handler,
)
// हैंडलर स्तरीय मिडलवेयर केवल विशिष्ट हैंडलर्स के लिए कार्यान्वित होता है
// इस तरह के मिडलवेयर को राउटर स्तर के मिडलवेयर की तरह ही हैंडलर में जोड़ा जा सकता है
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Message.UUID के लिए हैंडलर-विशिष्ट मिडलवेयर को कार्यान्वित किया जा रहा है")
return h(message)
}
})
// केवल डिबगिंग के उद्देश्यों के लिए, हम सभी मेसेज जो `incoming_messages_topic` पर प्राप्त होते हैं, उन्हें प्रिंट करते हैं
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// केवल डिबगिंग के उद्देश्यों के लिए, हम `outgoing_messages_topic` को भेजी जाने वाली सभी घटनाएं प्रिंट करते हैं
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// अब जब सभी हैंडलर्स को पंजीकृत कर दिया गया है, हम राउटर को चला सकते हैं।
// रन तब तक ब्लॉक करेगा जब तक राउटर चल रहा हो।
// ...
उपलब्ध मिडलवेयर
यहां Watermill द्वारा प्रदान किए गए पुनरव्यवहार्य मिडलवेयर हैं, और आप भी आसानी से अपने खुद के मिडलवेयर को अमल में ला सकते हैं। उदाहरण के लिए, अगर आप प्रत्येक आने वाले संदेश को किसी विशेष प्रकार के लॉग प्रारूप में स्टोर करना चाहते हैं, तो यह सबसे अच्छा तरीका होगा।
सर्किट ब्रेकर
// सर्किट ब्रेकर एक मध्यवर्ती है जो हैंडलर को एक सर्किट ब्रेकर में ढकेलता है।
// विन्यास के आधार पर, सर्किट ब्रेकर तेजी से विफल हो जाएगा अगर हैंडलर त्रुटियाँ लौटाता रहता है।
// यह लगातार विफलताओं से बचाव के लिए उपयुक्त है।
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker एक नया CircuitBreaker मध्यवर्ती लौटाता है।
// उपलब्ध सेटिंग्स के लिए, कृपया gobreaker दस्तावेज़ का संदर्भ देखें।
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// मध्यवर्ती सर्किटब्रेकर से सर्किट ब्रेकर को लौटाता है।
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
सहयोग
// SetCorrelationID मैसेज के लिए संरेखण आईडी सेट करता है।
//
// जब एक मैसेज सिस्टम में आता है, SetCorrelationID को कॉल किया जाना चाहिए।
// जब एक मैसेज अनुरोध में उत्पन्न होता है (जैसे, HTTP), मैसेज का संरेखण आईडी अनुरोध का संबंध आईडी होना चाहिए।
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID मैसेज से संरेखण आईडी लौटाता है।
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID हैंडलर द्वारा उत्पन्न सभी मैसेजों में एक संरेखण आईडी जोड़ता है।
// आईडी हैंडलर द्वारा प्राप्त मैसेज के मैसेज आईडी पर आधारित होता है।
//
// CorrelationID को सही ढंग से काम करने के लिए, पहले मैसेज सिस्टम में प्रवेश के लिए SetCorrelationID को कॉल करना चाहिए।
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
डुप्लिकेटर
// डुप्लिकेटर मैसेज को दो बार प्रसंस्करण करता है ताकि अंतःस्थाई होने का सुनिश्चित हो।
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
त्रुटियों को नजरअंदाज करें
// IgnoreErrors हैंडलर को कुछ विशेष रूप से परिभाषित त्रुटियों को नजरअंदाज करने की एक मध्यवर्ती प्रदान करता है।
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors एक नया IgnoreErrors मध्यवर्ती बनाता है।
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// मध्यवर्ती IgnoreErrors को लौटाता है।
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
तुरंत अधिलेखण
// InstantAck हैंडलर को तत्काल प्रवेशित मैसेज का स्वीकृति करने के लिए बनाता है, चाहे कोई त्रुटि हो या न हो।
// यह प्रदर्शन में सुधार करने के लिए उपयोग किया जा सकता है, लेकिन व्यापार है:
// यदि आपको सुनिश्चित करना है कि सटीक से एक ही बार वितरण होता है, तो कम से कम एक बार वितरण हो सकता है।
// यदि आपको क्रमित संदेशों की आवश्यकता है, तो यह क्रम विफल हो सकता है।
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
जहर
// PoisonQueue एक मध्यस्थता सुविधा प्रदान करता है जो अप्रसंस्कृत संदेशों का संबोधन करने और उन्हें एक अलग विषयक में प्रकाशित करता है।
// फिर, मुख्य मध्यस्थता श्रृंखला जारी रहती है, और व्यापार सामान्य रूप से बढ़ता है।
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter PoisonQueue के समान है, लेकिन यह एक कार्य को स्थानसूची क्रमिकीया निर्धारित करने के लिए एक फ़ंक्शन स्वीकार करता है।
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
यादृच्छिक विफलता
// RandomFail हैंडलर को एक यादृच्छिक संभावना के आधार पर विफल करते हैं। त्रुटि संभावना का अनुपात (0, 1) के भीतर होना चाहिए।
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("एक यादृच्छिक त्रुटि हुई")
}
return h(message)
}
}
}
// RandomPanic हैंडलर को एक यादृच्छिक संभावना के आधार पर पैनिक करता है। पैनिक संभावना का अनुपात (0, 1) के भीतर होना चाहिए।
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("एक यादृच्छिक पैनिक हुआ")
}
return h(message)
}
}
}
रिकवरर
// RecoveredPanicError विकल्प त्रुटि और इसकी स्टैक ट्रेस सूचना को धारित करता है जो प्राप्त हुआ पैनिक त्रुटि को होल्ड करता है।
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer हैंडलर से किसी भी पैनिक को बहाल करता है और हैंडलर से वापस लौटे किसी भी त्रुटि में RecoveredPanicError का संलग्न करता है।
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
पुनः प्रयास
// Retry एक मिडलवेयर प्रदान करता है जो हैंडलर को पुनः प्रयास करता है अगर एक त्रुटि लौटाई जाती है।
// पुनः प्रयास व्यवहार, घाताक्षरी वापसी, और अधिकतम बीत गई समय को कॉन्फ़िगर किया जा सकता है।
type Retry struct {
// MaxRetries कोशिशों की अधिकतम संख्या है।
MaxRetries int
// InitialInterval पुनः प्रयास के बीच की प्रारंभिक अंतर है। उसके उपयुक्त अंतर से आगे के अंतर।
InitialInterval time.Duration
// MaxInterval पुनः प्रयास का घाताक्षरी वापसी का ऊपरी सीमा सेट करता है।
MaxInterval time.Duration
// Multiplier एक फैक्टर है जिससे इंतजार अंतर को गुणा किया जाएगा।
Multiplier float64
// MaxElapsedTime पुनः प्रयासों के लिए अधिकतम समय सीमा सेट करता है। अगर 0 है, तो यह अक्षम है।
MaxElapsedTime time.Duration
// RandomizationFactor अंशक तंत्रित करता है वापसी समय को तारीख के भीतर इस श्रेणी के अंदर।
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)]।
RandomizationFactor float64
// OnRetryHook प्रत्येक पुनः प्रयास प्रयास पर निष्पादित करने के लिए एक वैकल्पिक समारोह है।
// वर्तमान पुनः प्रयास संख्या पुनः प्रयास के माध्यम से पारित किया जाता है।
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware पुनः प्रयास मिडलवेयर को लौटाता है।
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
थ्रॉटल
// Throttle एक मिडलवेयर प्रदान करता है जो निश्चित समयावधि के भीतर प्रसंसाधन की संख्या को सीमित करने के लिए है।
// यह अनप्रसंस्कृत लंबे कतार पर चल रहे हैंडलर्स का बोझ नहीं डालने के लिए प्रयोग किया जा सकता है।
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle एक नया थ्रॉटल मिडलवेयर बनाता है।
// उदाहरण समय और गिनती: NewThrottle(10, time.Second) 10 सेकंड में 10 संदेश दिखाता है।
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware थ्रॉटल मिडलवेयर को लौटाता है।
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// मल्टीपल हैंडलर्स द्वारा साझा की गई थ्रॉटल स्वयं के "टिक्स" का इंतजार करेगा।
समय सीमा
// Timeout निर्धारित अवधि के बाद आने वाले संदेश के संकेत को रद्द करता है।
// हैंडलर की किसी भी समय सीमा-संवेदनशील विशेषताओं को फेल होने की जानकारी के लिए msg.Context().Done() को सुनना चाहिए।
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}