المقدمة
يُستخدم الوسيط لتوسيع إطار الحدث، وإضافة الوظائف المخصصة، وتوفير الوظائف المهمة غير ذات الصلة بمنطق المعالج الرئيسي. على سبيل المثال، إعادة محاولة تشغيل المعالج بعد إرجاع خطأ، أو استعادة من الانهيار والتقاط أثر الكومة داخل المعالج.
تم تعريف توقيع وظيفة الوسيط على النحو التالي:
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// يسمح HandlerMiddleware لنا بكتابة الديكورات المماثلة للمعالج.
// يمكنها تنفيذ بعض العمليات قبل المعالج (على سبيل المثال، تعديل الرسالة المستهلكة)
// وأيضًا أداء بعض العمليات بعد المعالج (تعديل الرسالة المنتجة، ACK/NACK للرسالة المستهلكة، معالجة الأخطاء، تسجيل الأحداث، إلخ).
//
// يمكن إرفاقها بالموجه عن طريق استخدام الطريقة `AddMiddleware`.
//
// مثال:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// يرجى func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("قبل تنفيذ المعالج")
// الرسائل_المنتجة, خطأ := h(message)
// fmt.Println("بعد تنفيذ المعالج")
//
// ارجع الرسائل_المنتجة, خطأ
// }
// }
نوع HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
الاستخدام
يمكن تطبيق الوسيطة على كافة المعالجين في الموجه أو على المعالجين المحددين. عند إضافة الوسيطة مباشرة إلى الموجه، سيتم تطبيقها على كافة المعالجين المقدمة للموجه. إذا تم تطبيق الوسيطة فقط على معالج معين، فيجب إضافتها إلى المعالج في الموجه.
إليك مثالاً على الاستخدام:
كود المصدر الكامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// عند استلام إشارة SIGTERM، سيقوم SignalsHandler بإغلاق الموجه بشكلٍ سلس.
// يمكنك أيضًا إغلاق الموجه عن طريق استدعاء `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// سيتم تنفيذ وسائط الموجه على كافة الرسائل المُرسلة إلى الموجه
router.AddMiddleware(
// سيقوم CorrelationID بنسخ معرّف الارتباط من البيانات الواردة في الرسالة إلى الرسالة المولّدة
middleware.CorrelationID,
// إذا عاد المعالج بخطأ، سيتم إعادة محاولته.
// سيتم إعادة المحاولة حتى MaxRetries مرة، بعد ذلك سيتم نفي الرسالة وإعادة إرسالها بواسطة PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// يتعامل Recoverer مع الانهيارات في المعالج.
// في هذه الحالة، ينقلها كأخطاء إلى وسيط الإعادة.
middleware.Recoverer,
)
// لغرض البساطة، نستخدم هنا نظام gochannel Pub/Sub،
// يمكنك استبداله بأي تنفيذ لنظام النشر/الاشتراك وسيعمل بنفس الطريقة.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// نقوم بنشر بعض الرسائل الواردة في الخلفية
go publishMessages(pubSub)
// يُرجع AddHandler معالج يمكن استخدامه لإضافة وسائط على مستوى المعالج
// أو لإيقاف المعالج.
handler := router.AddHandler(
"struct_handler", // اسم المعالج، يجب أن يكون مميزًا
"incoming_messages_topic", // الموضوع الذي سيُقرأ منه الأحداث
pubSub,
"outgoing_messages_topic", // الموضوع الذي ستُنشر إليه الأحداث
pubSub,
structHandler{}.Handler,
)
// لن يُنفذ وسيط المعالج إلا على المعالجات المحددة.
// يمكن إضافة مثل هذه الوسائط إلى المعالج بنفس الطريقة التي تمت بها إضافة وسائط الموجه
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("تنفيذ وسائط المعالج المحددة للرسالة", message.UUID)
return h(message)
}
})
// لأغراض التصحيح فقط، نقوم بطباعة كافة الرسائل المُستلمة على `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// لأغراض التصحيح فقط، نقوم بطباعة كافة الأحداث المُرسلة إلى `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// الآن بعد تسجيل كافة المعالجات، يمكننا تشغيل الموجه.
// Run سيعطل حتى يتوقف تشغيل الموجه.
// ...
الوسائط المتاحة
ها هي الوسائط القابلة لإعادة الاستخدام المقدمة من واترميل، ويمكنك أيضًا تنفيذ وسائط مُخصصة بسهولة. على سبيل المثال، إذا أردت تخزين كل رسالة واردة في نوع معين من تنسيق السجلات، فهذا هو أفضل طريقة لفعل ذلك.
كسر الدائرة (Circuit Breaker)
// كسرالدائرة هو وسيط يلف المعالج بكسر اتصال.
// بناءً على التكوين، سيقوم كسر الدائرة بالفشل السريع إذا استمر المعالج في إرجاع الأخطاء.
// هذا مفيد لمنع الأخطاء التتابعية.
نوع كسرالدائرة هو struct {
كسرالدائرة)
}
// NewCircuitBreaker يعيد وسيط كسرالدائرة الجديد.
// للحصول على الإعدادات المتاحة، يرجى الرجوع إلى وثائق gobreaker.
دالة NewCircuitBreaker يعيد كسرالدائرة الجديد{
كسرالدائرة) {
cb: gobreaker.NewCircuitBreaker(الإعدادات),
}
}
// يعيد المنتصف وسيط كسرالدائرة.
دالة (الب) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := الب.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return النتيجة، أو
}
}
الترابط (Correlation)
// يضع SetCorrelationID معرف الترابط للرسالة.
//
// عندما تدخل رسالة النظام، يجب استدعاء SetCorrelationID.
// عند إنشاء رسالة في طلب (على سبيل المثال، HTTP)، يجب أن يكون معرف الترابط للرسالة نفس معرف الترابط للطلب.
دالة SetCorrelationID يضع (id string، رسالة *message.Message) {
إذا كان MessageCorrelationID(msg) != "" {
العودة
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// يعيد MessageCorrelationID معرف الترابط من الرسالة.
دالة MessageCorrelationID يعيد معرف الترابط من الرسالة *message.Message) سلسلة {
يعود message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID يضيف معرف ترابط إلى جميع الرسائل التي تم إنشاؤها بواسطة المعالج.
// يعتمد معرف الرسالة الصادرة عن معرف الرسالة الذي تلقاه المعالج.
//
// من أجل أن يعمل CorrelationID بشكل صحيح، يجب أن يتم استدعاء SetCorrelationID أولاً للرسالة لدخول النظام.
دالة CorrelationID يعيد (h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
الرسائل المنتجة، الخطأ := h(message)
معرف الترابط := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return الرسائل المنتجة، الخطأ
}
}
مكرر (Duplicator)
// مكرر it يعالج الرسالة مرتين لضمان أن نقطة النهاية هي متعددة الاتجاهات.
دالة مكرر(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
إذا كان أول خطا != nil {
العودة الصفر، الخطا الأول
}
secondProducedMessages, secondErr := h(msg)
إذا كان الخطأ الثاني != nil {
العودة الصفر، الخطأ الثاني
}
الرجوع إلى إضافة(firstProducedMessages، secondProducedMessages...)، أو
}
}
تجاهل الأخطاء (Ignore Errors)
// يوفر IgnoreErrors وسيط يسمح للمعالج بتجاهل بعض الأخطاء المعرفة صراحة.
نوع IgnoreErrors هو struct {
ignoredErrors map[string]struct{}
}
// ينشئ NewIgnoreErrors وسيط IgnoreErrors جديد.
دالة NewIgnoreErrors ينشئ IgnoreErrors جديد (الأخطاء []error) {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
الرجوع إلى IgnoreErrors{ errsMap }
}
// يعيد الوسيط IgnoreErrors.
دالة (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
يعود func(msg *message.Message) ([]*message.Message, error) {
الأحداث، الخطأ := h(msg)
إذا كان الخطأ != nil {
إذا كان، صحيح := i.ignoredErrors[errors.Cause(err).Error()]; صحيح {
العودة الأحداث، الصفر
}
العودة الأحداث، الخطأ
}
العودة الأحداث، الصفر
}
}
الاعتراف الفوري (Instant Ack)
// يجعل InstantAck المعالج يعترف فورًا بالرسالة الواردة، بغض النظر عن أي أخطاء.
// يمكن استخدامه لتحسين الإنتاجية، ولكن الصراحة هي:
// إذا كنت بحاجة إلى ضمان تسليم مرة واحدة بالضبط، قد تحصل على تسليم مرة واحدة على الأقل.
// إذا كنت بحاجة إلى رسائل مرتبة، قد تفسد الترتيب.
دالة InstantAck يجعل (h message.HandlerFunc) message.HandlerFunc {
يعود func (message *message.Message) ([]*message.Message, error) {
message.Ack()
العودة h(message)
}
}
السام
// PoisonQueue يوفر ميزة وسيطية للتعامل مع الرسائل التي لا يمكن معالجتها ونشرها إلى موضوع منفصل.
// ثم يستمر سلسلة الوسيط الرئيسية في التنفيذ، ويستمر العمل كالمعتاد.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter مشابه لـ PoisonQueue، ولكنه يقبل دالة لتحديد أي الأخطاء تستوفي معايير الطابور السام.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
الفشل العشوائي
// RandomFail يُسبب فشل المعالج بناءً على احتمالية عشوائية. يجب أن تكون احتمالية الخطأ ضمن النطاق (0، 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("حدث خطأ عشوائي")
}
return h(message)
}
}
}
// RandomPanic يسبب المعالج في الدوام بناءً على احتمالية عشوائية. يجب أن تكون احتمالية الانهيار ضمن النطاق (0، 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("حدث انهيار عشوائي")
}
return h(message)
}
}
}
استرداد
// RecoveredPanicError يحتوي على خطأ الانهيار المستعاد ومعلومات تتبع الكيس.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer يستعيد أي انهيار من المعالج ويرفق RecoveredPanicError بتتبع الكيس مع أي خطأ يتم إرجاعه من المعالج.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
إعادة المحاولة
// يوفر Retry وسيطًا يُعيد تشغيل المعالج إذا تم إرجاع خطأ.
// يمكن تكوين سلوك المحاولة المعيّنة ورجوع الأولى، والزمن الأقصى المستغرق.
type Retry struct {
// MaxRetries هو الحد الأقصى للمحاولات التي يجب إجراؤها.
MaxRetries int
// InitialInterval هو الفاصل الزمني الأولي بين عمليات المحاولة. سيتم تدريج الفواصل الزمنية التالية بواسطة المضاعف.
InitialInterval time.Duration
// MaxInterval يضبط الحد الأعلى لرجوع الأولى المتسلسل.
MaxInterval time.Duration
// Multiplier هو العامل الذي سيتم ضربه في فاصل الانتظار بين المحاولات.
Multiplier float64
// MaxElapsedTime يضبط الحد الزمني الأقصى للمحاولات. إذا كانت قيمة 0، فإنه معطل.
MaxElapsedTime time.Duration
// RandomizationFactor ينتشر الزمن المتأخر داخل النطاق التالي:
// [الفاصل_الزمني_الحالي * (1 - عامل_التعشيش_العشوائي), الفاصل_الزمني_الحالي * (1 + عامل_التعشيش_العشوائي)].
RandomizationFactor float64
// OnRetryHook هو وظيفة اختيارية يتم تنفيذها في كل محاولة لإعادة المحاولة.
// يتم تمرير رقم إعادة المحاولة الحالية من خلال retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware يعيد وسيط Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
الضبط
// Throttle يوفر وسيطًا لتقييد عدد الرسائل المعالجة خلال فترة زمنية معينة.
// يمكن استخدام هذا لمنع تحميل المعالجين الذين يعملون على قائمة انتظار طويلة غير معالجة.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle ينشئ وسيط Throttle جديدًا.
// مثال على الزمن والعدّ: NewThrottle(10، time.Second) يشير إلى 10 رسائل في الثانية.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware يُعيد وسيط الضبط.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// ينتظر الأواصر المشتركة بواسطة معالجين متعددين "نقراتها".
الحد الزمني
// يلغي Timeout سياق الرسالة الوارد بعد المدة المحددة.
// يجب على وظائف المعالج الحساسة للحد الزمني أن تستمع إلى msg.Context().Done() لمعرفة متى يجب الفشل.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}