مقدمہ

مِڈل ویئر اس کا استعمال کرتا ہے تاکہ واقعہ فریم ورک کو بڑھایا جا سکے، اپنی مرضی کی فعالیت جمع کر سکے، اور اہم فعالیت فراہم کرے جو مرکزی ہینڈلر کی منطق سے منسلک نہ ہو۔ مثال کے طور پر، خرابی کے بعد ہینڈلر کو دوبارہ کوشش کرنا، یا پینک سے بحرانی حالت میں دوبارہ بےت کار ہونا اور ہینڈلر کے اندر سٹیک ٹریس سے نشانہ بنانا۔

مِڈل ویئر کا فنکشن سگنیچر مندرجہ ذیل ہے:

مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware ہمیں ہینڈلر کے متشابہ ڈیکوریٹرز لکھنے کی اجازت دیتا ہے۔
// یہ کچھ عمل کر سکتا ہے ہینڈلر سے پہلے (مثلاً، استعمال کردہ پیغام کو ترتیب دینا)
// اور ہینڈلر کے بعد بھی کچھ عمل کر سکتا ہے (پیدا کردہ پیغام کو ترتیب دینا، استعمال کردہ پیغام کو ACK/NACK کرنا، خرابیوں کا سامنا کرنا، لاگنگ وغیرہ)۔
//
// اسے راؤٹر میں منسلک کیا جا سکتا ہے، `AddMiddleware` میں استعمال کر کے۔
//
// مثال:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("ہینڈلر کو اجراء کرنے سے پہلے")
//			اُفرادہ پیغامز، خرابی := h(message)
//			fmt.Println("ہینڈلر کو اجراء کرنے کے بعد")
//
//			return اُفرادہ پیغامز، خرابی
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

استعمال

Middleware راؤٹر میں تمام ہینڈلرز پر لاگو کیا جا سکتا ہے یا مخصوص ہینڈلرز پر بھی۔ جب میڈل ویئر راوٹر میں مستقیم طور پر شامل کیا جاتا ہے تو یہ راوٹر کے لیے موجود تمام ہینڈلرز پر لاگو کیا جائے گا۔ اگر کسی میڈیویر کو صرف ایک خاص ہینڈلر کے لیے لاگو کیا گیا ہے تو یہ راوٹر میں ہینڈلر میں شامل کیا جانا چاہئے۔

یہاں استعمال کے ایک مثال ہے۔

مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
     router, err := message.NewRouter(message.RouterConfig{}, logger)
     if err != nil {
         panic(err)
     }

     // SIGTERM سگنل کو سازگاری سے راوٹر بند کرنے والا ہے۔
     // آپ `r.Close()` کو بھی بند کرسکتے ہیں۔
     router.AddPlugin(plugin.SignalsHandler)

     // راوٹر-سطح کا میڈیویر ہر پیغام کو راوٹر بھیجا جائے گا
     router.AddMiddleware(
         // CorrelationID تعلق ID کا استنساب داخلی پیغام میٹا ڈیٹا سے پیدا کرے گا
         middleware.CorrelationID,

         // اگر ہینڈلر ایک خرابی لوٹاتا ہے تو یہ دوبارہ کوشش کرے گا
         // یہ حد سے زیادہ MaxRetries مرتبہ دوبارہ کرے گا، اس کے بعد پیغام پروس بیک اور بحال کردیا جائے گا۔
         middleware.Retry{
             MaxRetries:      3,
             InitialInterval: time.Millisecond * 100,
             Logger:          logger,
         }.Middleware,

         // Recoverer ہینڈلر میں پینک کا سامنا کریگا۔
         // اس موقع میں، یہ انہیں خرابیاں واپسی میڈیویر کے طور پر پاس کرے گا۔
         middleware.Recoverer,
     )

     // آسانی کے لیے، ہم یہاں gochannel Pub/Sub استمعال کرتے ہیں،
     // آپ اسے کسی بھی Pub/Sub تنظیم کے ساتھ تبدیل کرسکتے ہیں اور یہ بالکل کام کرے گا۔
     pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

     // پیچیدگی میں، ہم پیغامات کو پیش کرنے کے لیے ہم gochannel Pub/Sub استعمال کرتے ہیں
     go publishMessages(pubSub)

     // AddHandler ایک ہینڈلر بند کرتا ہے جو ہینڈلر-سطح کے میڈیویر ہیں
     // یا ہینڈلر کو بند کرنے کے لیے استعمال کیا جاسکتا ہے۔
     handler := router.AddHandler(
         "struct_handler",          // ہینڈلر نام، یکتا ہونا چاہئے
         "incoming_messages_topic", // وہ موضوع جہاں سے واقعات پڑھے جائیں گے
         pubSub,
         "outgoing_messages_topic", // وہ موضوع جہاں واقعات شائع کیا جائے گا
         pubSub,
         structHandler{}.Handler,
     )

     // ہینڈلر-سطح کا میڈیویر صرف مخصوص ہینڈلر کے لئے نافذ ہوتا ہے
     // اس طرح کے میڈیویر کو راوٹر کے میں یہ اضافی میڈیویر کا طریقہ کے ساتھ جوڑا جا سکتا ہے
     handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
         return func(message *message.Message) ([]*message.Message, error) {
             log.Println("message.UUID کے لئے ہینڈلر-مخصوص میڈیویر کا انجام دینا ہے")

             return h(message)
         }
     })

     // صرف ڈیبگنگ مقصد کے لئے، ہم "incoming_messages_topic" پر موجود تمام پیغامات چھاپتے ہیں
     router.AddNoPublisherHandler(
         "print_incoming_messages",
         "incoming_messages_topic",
         pubSub,
         printMessages,
     )

     // صرف ڈیبگنگ مقصد کے لئے، ہم "outgoing_messages_topic" کو موجود واقعات چھاپتے ہیں
     router.AddNoPublisherHandler(
         "print_outgoing_messages",
         "outgoing_messages_topic",
         pubSub,
         printMessages,
     )

     // اب جب تمام ہینڈلرز رجسٹر کر لیے گئے ہیں، ہم راوٹر کو دوڑا سکتے ہیں۔
     // رن بلاک ہو گا جب تک راوٹر دوڑتے رہتے ہیں۔
// ...

دستیاب میڈیویر

یہاں واٹرمل کے قابل اشتمال میڈیویر پیش کیے گئے ہیں، اور آپ خود بھی آسانی سے اپنا میڈیویر پیش کرسکتے ہیں۔ مثال کے طور پر، اگر آپ ہر داخلی پیغام کو مخصوص قسم کی لاگ فارمیٹ میں ذخیرہ کرنا چاہتے ہیں تو یہ سب سے بہترین طریقہ ہے۔

سرکٹ بریکر

// سرکٹ بریکر ہینڈلر کو سرکٹ بریکر میں لپیٹنے والا ایک مڈیویئر ہے۔
// تشکیل کے مطابق, اگر ہینڈلر مسلسل خرابیاں واپس کرنے جارہا ہو تو سرکٹ بریکر فوراً ناکام ہوجائے گا۔
// یہ کسکیڈنگ نقصانات سے بچانے کے لیے کارآمد ہے۔
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker نیا CircuitBreaker مڈیویئر واپس کرتا ہے۔
// دستیاب ترتیبات کے لیے, براہ کرم gobreaker دستاویزات پر رجوع کریں۔
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware سرکٹ بریکر مڈیویئر واپس کرتا ہے۔
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

تعلق

// SetCorrelationID پیغام کے لیے تعلقی شناخت کو تعین کرتا ہے۔
//
// جب ایک پیغام نظام میں داخل ہوتا ہے, تعلقی شناخت کو کال کرنا چاہئے۔
// جب ایک درخواست میں پیغام پیدا ہوتا ہے (مثال کے طور پر, HTTP میں), پیغام کا تعلقی شناخت درخواست کا تعلقی شناخت ہونا چاہئے۔
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID پیغام سے تعلقی شناخت کو واپس کرتا ہے۔
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID ہینڈلر کے ذریعہ پیدا کردہ تمام پیغامات میں ایک تعلقی شناخت شامل کرتا ہے۔
// تعلقی شناخت پیغام کے مواقع سے حاصل آئی ڈی پر مبنی ہے۔
//
// تعلقی شناخت کے صحیح کام کے لیے, پہلے پیغام سسٹم میں داخل ہونے کے لیے SetCorrelationID کو پہلے کال کرنا چاہئے۔
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

Duplicator

// Duplicator پیغام کو دوبارہ کام یکساں ہونے کے لیے دو بار پروسیس کرتا ہے۔
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

اندوز غلطیاں نظر انداز کریں

// انگور ایررز ہینڈلر کو کچھ خاص طور پر معین کردہ غلطیوں کو نظر انداز کرنے کا ایک مڈیویئر فراہم کرتا ہے۔
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors نئی IgnoreErrors مڈیویئر بناتا ہے۔
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware نظر انداز کرنے والا ایگز مڈیویئر واپس کرتا ہے۔
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

فوری تسلیم

// فوری تسلیم ہینڈلر کو بلاواسطہ طور پر آنے والے پیغام کو فوراً تسلیم کرتا ہے, بغیر کسی بھی غلطی کے اثر کے۔
// یہ ترتیب سے میسار کرنے کے لیے استعمال کیا جا سکتا ہے, لیکن بدلے میں:
// اگر آپ کو یقینی کرنا ہوتا ہے کہ صرف ایک بار تسلیم ہوتا ہے, تو آپ کم سے کم ایک بار تسلیم حاصل کر سکتے ہیں۔
// اگر آپ کو منظم پیغامات کی ضرورت ہوتی ہے, تو یہ منظم کرنے میں معکوس ہوسکتا ہے۔
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

زہر

// PoisonQueue پیش کرتا ہے ایک میڈل ویئر فیچر جو غیر قابل پراسیس کپی کرنے والے پیغامات کا سلوک کرتا ہے اور انہیں ایک الگ ٹاپک کو شائع کرتا ہے۔
// پھر، اصل میڈل ویئر چین جاری رہتا ہے، اور کاروبار معمول کے طور پر آگے بڑھتا ہے۔
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter PoisonQueue کے مشابہ ہے، لیکن یہ ایک فنکشن کو قبول کرتا ہے جو زہریلی قطار کی معیار تعین کرتا ہے۔
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

بے قاعدہ ناکامی

// RandomFail ہینڈلر کو ایک بے قاعدہ احتمال کی بنیاد پر ناکام بناتا ہے۔ خرابی کا احتمال وقفے (0، 1) کے اندر ہونا چاہئے۔
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("ایک بے قاعدہ خرابی واقع ہوگئی")
			}
			return h(message)
		}
	}
}

// RandomPanic ہینڈلر کو ایک بے قاعدہ احتمال کی بنیاد پر پینک کرنے کا باعث بناتا ہے۔ پینک کا احتمال وقفے (0، 1) کے اندر ہونا چاہئے۔
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("ایک بے قاعدہ پینک واقع ہوگیا")
			}
			return h(message)
		}
	}
}

بازیاب کنندہ

// RecoveredPanicError بازیاب شدہ پینک کی خرابی اور اس کی اسٹیک ٹریس معلومات کو نگرانی کرتا ہے۔
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer ہینڈلر سے کسی بھی پینک کو بازیاب کرتا ہے اور RecoveredPanicError کو ایرر سے جڑے میسج کے ساتھ منسلک کرتا ہے۔
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

دوبارہ کوشش

// دوبارہ کوشش ایک میڈل ویئر فراہم کرتا ہے جو اگر کوئی خرابی واپسی کرتا ہے تو ہینڈلر کو دوبارہ کوشش کرتا ہے۔
// دوبارہ کوشش کا رویہ، کُل وقت اور زیادہ سے زیادہ مقت وقت تشکیل دیا جا سکتا ہے۔
type Retry struct {
	// MaxRetries بنانے کے لئے زیادہ سے زیادہ کوششوں کی تعداد ہے۔
	MaxRetries int

	// InitialInterval دوبارہ کوشش کے درمیان ابتدائی وقفے کو ظاہر کرتا ہے۔ متعاقب وقفے منسلک کرداروں بھی بنیاد بنائیں گے۔
	InitialInterval time.Duration
	// زیادہ دوری جدول کی زیادہ حد مقرر کرتا ہے۔
	MaxInterval time.Duration
	// Multiplier وقفے کو دوبارہ استعادہ کرنے کے درمیان ضربیہ ہے۔
	Multiplier float64
	// MaxElapsedTime دوبارہ کوشش کے لئے زیادہ سے زیادہ وقت کی حد مقرر کرتا ہے۔ اگر 0 ہو، تو یہ غیر فعال ہوگا۔
	MaxElapsedTime time.Duration
	// RandomizationFactor درمیانی وقفے میں تصادفی طور پر فیلا کرکے درج ذیل حد میں وقفہ پھیلاتا ہے:
	// [موجودہ اوقات * (1 - رینڈمائزیشن فیکٹر), موجودہ اوقات * (1 + رینڈمائزیشن فیکٹر)]۔
	RandomizationFactor float64

	// OnRetryHook ہر دوبارہ کوشش کے کوشش پر نمائندے کرنے کے لئے ایک اختیاری تعلق ہے۔
	// فیشن ڈوبارہ کوشش شمار کیا جاتا ہے۔
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// میڈل ویئر واپسی کرتا ہے
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

تھراٹل

// تھراٹل میڈل ویئر فراہم کرتا ہے جو مخصوص وقت میں پیغامات کی تعداد کو محدود کرتا ہے۔
// یہ اسے استعمال کیا جا سکتا ہے تاکہ غیر موصول لمبی قطار پر چل رہے ہینڈلروں کو بھرماری سے روکا جا سکے۔
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle ایک نیا تھراٹل میڈل ویئر بناتا ہے۔
// نمونہ مدت اور تعداد: NewThrottle(10, time.Second) کا کہنا ہے کہ 10 پیغام ہر سیکنڈ۔
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// میڈل ویئر واپسی کرتا ہے
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// مشترک ہینڈلروں کی طرف سے ان کی "ٹکٹ" کا انتظار کریں گے۔

وقفہ

// وقفہ مقرر کرتا ہے کہ مخصوص دورانیہ کے بعد آنے والے پیغام کے سکوپ کو منسوخ کر دیتا ہے۔
// ہینڈلر کی کسی بھی وقت کی عملیتوں کو یہ معلوم کرنے کے لئے کہ کب ناکام ہونا چاہئے، msg.Context().Done() سنوارنا چاہئے۔
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}