مقدمه

میدل‌ویر به کار می‌رود تا چارچوب رویداد را گسترش دهد، قابلیت‌های سفارشی را اضافه کند، و قابلیت‌های مهمی که به منطق دستور اصلی مربوط نیستند را فراهم کند. به عنوان مثال، تکرار اجرای دستور پس از بازگشت خطا، یا بازیابی از وقوع خطا و ضبط ردیابی استک در داخل دستور.

امضای تابع میدل‌ویر به صورت زیر تعریف شده است:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware به ما امکان می‌دهد تا مانند دستور‌دهنده، دکوراتورهای مشابه دستور را بنویسیم.
// این می‌تواند برخی عملیات را قبل از دستور (مانند تغییر پیام مصرف شده) انجام دهد
// و همچنین برخی عملیات را بعد از دستور (تغییر پیام تولید شده، ACK/NACK پیام مصرفی، مدیریت خطا، ثبت وقایع و غیره) انجام دهد.
//
// می‌توان آن را به روتر متصل کرد با استفاده از روش `AddMiddleware`.
//
// مثال:
//
//	func ExampleMiddleware(h HandlerFunc) HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("قبل از اجرای دستور")
//			producedMessages, err := h(message)
//			fmt.Println("بعد از اجرای دستور")
//
//			return producedMessages, err
//		}
//	}
نوع HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

استفاده

میان افزار می‌تواند بر روی تمام دستگیره‌ها در مسیریاب یا بر روی دستگیره‌های خاص اعمال شود. وقتی میان افزار به صورت مستقیم به مسیریاب اضافه می‌شود، بر روی تمام دستگیره‌های ارائه شده برای مسیریاب اعمال می‌شود. اگر یک میان افزار فقط بر روی یک دستگیره خاص اعمال شود، باید به دستگیره در مسیریاب اضافه شود.

یک مثال از استفاده:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// وقتی سیگنال SIGTERM دریافت می‌شود، SignalsHandler به صورت منظم مسیریاب را بسته خواهد کرد.
	// همچنین می‌توانید مسیریاب را با فراخوانی `r.Close()` نیز ببندید.
	router.AddPlugin(plugin.SignalsHandler)

	// میان افزار سطح مسیریاب بر روی هر پیام ارسالی به مسیریاب اجرا خواهد شد
	router.AddMiddleware(
		// CorrelationID کد همخوانی را از متاداده پیام ورودی به پیام ساخته شده کپی می‌کند
		middleware.CorrelationID,

		// در صورتی که دستگیره خطا بازگرداند، دوباره امتحان می‌کند.
		// حداکثر تعداد امتحان‌ها MaxRetries بار خواهد بود، بعد از این تعداد پیام Nacked خواهد شد و توسط PubSub مجدداً ارسال خواهد شد.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer با خطاهای در دستگیره برخورده می‌کند.
		// در این حالت، آنها را به عنوان خطاها به میان افزار Retry منتقل می‌کند.
		middleware.Recoverer,
	)

	// برای سادگی، در اینجا از ایمپلمنتاسیون gochannel Pub/Sub استفاده می‌کنیم
	// می‌توانید آن را با هر ایمپلمنتاسیون Pub/Sub دیگری جایگزین کنید و همچنان کار می‌کند.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// انتشار برخی از پیام‌های ورودی به صورت پس‌زمینه
	go publishMessages(pubSub)

	// AddHandler یک دستگیره برمی‌گرداند که می‌تواند برای اضافه کردن میان افزار سطح دستگیره یا متوقف کردن دستگیره استفاده شود.
	handler := router.AddHandler(
		"struct_handler",          // نام دستگیره، باید یکتا باشد
		"incoming_messages_topic", // تاپیکی که رویدادها از آن خوانده خواهند شد
		pubSub,
		"outgoing_messages_topic", // تاپیکی که رویدادها در آن منتشر خواهند شد
		pubSub,
		structHandler{}.Handler,
	)

	// میان افزار سطح دستگیره فقط برای دستگیره‌های خاص اجرا می‌شود
	// اینگونه میان افزار می‌تواند به دستگیره به همان روشی که میان افزار سطح مسیریاب اضافه می‌شود اضافه شود
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("اجرای میان افزار خاص دستگیره برای", message.UUID)

			return h(message)
		}
	})

	// تنها برای اهداف اشکال‌زدایی، تمام پیام‌های دریافتی در `incoming_messages_topic` را چاپ می‌کنیم
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// تنها برای اهداف اشکال‌زدایی، تمام رویدادهای ارسال شده به `outgoing_messages_topic` را چاپ می‌کنیم
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// حال که تمام دستگیره‌ها ثبت شده‌اند، می‌توانیم مسیریاب را اجرا کنیم.
	// اجرا تا زمانی که مسیریاب دارد اجرا می‌شود، مسدود خواهد شد.
// ...

میان افزارهای قابل دسترس

اینجا میان افزارهای قابل استفاده دیگری که توسط Watermill فراهم شده‌اند آورده شده است، و می‌توانید به راحتی میان افزارهای خود را نیز پیاده سازی کنید. به عنوان مثال، اگر می‌خواهید هر پیام ورودی را در یک نوع خاص از فرمت لاگ ذخیره کنید، بهترین راه انجام این کار است.

قطاع مدار

// CircuitBreaker یک میانجی است که handler را در یک قطاع مدار می‌بندد.
// بر اساس پیکربندی، قطاع مدار در صورت ادامه بازگرداندن خطاها تلاش سریع برای شکستن عملکرد handler را انجام می دهد.
// این کار برای جلوگیری از فراگیری خطاها مفید است.
type CircuitBreaker struct {
	cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker یک میانجی CircuitBreaker جدید را برمی‌گرداند.
// برای تنظیمات موجود، لطفا به مستندات gobreaker مراجعه نمایید.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware میانجی CircuitBreaker را برمی‌گرداند.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

همبستگی

// SetCorrelationID برای پیام ID همبستگی را تعیین می‌کند.
//
// زمانی که یک پیام وارد سیستم می شود، باید SetCorrelationID فراخوانی شود.
// زمانی که یک پیام در یک درخواست تولید می‌شود (مانند HTTP)، ID همبستگی پیام باید مشابه ID همبستگی درخواست باشد.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID ID همبستگی را از پیام بازمی گرداند.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID ID همبستگی را به تمام پیام‌های تولیدشده توسط handler اضافه می کند.
// این ID بر اساس پیام ID دریافتی توسط handler است.
//
// برای CorrelationID به درستی کار کند، ابتدا برای وارد شدن پیام به سیستم باید SetCorrelationID فراخوانی شود.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

تکثیرکننده

// Duplicator برای اطمینان از تکرارپذیری نقطه‌پایان، پیام را دوبار پردازش می کند.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

صرف‌نظر از خطاها

// IgnoreErrors یک میانجی ارائه می دهد که به handler اجازه می دهد برخی از خطاهای به طور صریح تعریف شده را نادیده بگیرد.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors یک میانجی IgnoreErrors جدید ایجاد می‌کند.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware میانجی IgnoreErrors را برمی‌گرداند.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

اقرارفوری

// InstantAck باعث می‌شود handler بلافاصله پیام ورودی را تایید کند، بدون توجه به هر گونه خطا.
// این می‌تواند برای بهبود ظرفیت استفاده شود، اما مقایسه آن:
// اگر شما نیاز به اطمینان از تحویل یکبار دقیق دارید، ممکن است حداقل یک بار تحویل بگیرید.
// اگر شما به پیام‌های مرتب شده نیاز دارید، ممکن است ترتیب را بشکند.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

سم زهرآگین

// PoisonQueue یک ویژگی میان‌افزار فراهم می‌کند تا پیام‌های ناقابل پردازش را اداره کرده و آن‌ها را به یک موضوع جداگانه منتشر کند.
// سپس زنجیره میان‌افزار اصلی ادامه می‌دهد و فعالیت کسب و کار همانند قبل ادامه می‌یابد.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter مشابه PoisonQueue است، اما یک تابع را برای تعیین اشکالاتی که بافقدرت سم زهرآگین مطابقت دارند، می‌پذیرد.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

خرابی تصادفی

// RandomFail باعث شکست دهنده روندگارنده براساس احتمال تصادفی می‌شود. احتمال خطا باید در محدوده (0، 1) باشد.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("خطای تصادفی رخ داد")
			}
			return h(message)
		}
	}
}

// RandomPanic باعث اعتراض تصادفی روندگارنده براساس احتمال تصادفی می‌شود. احتمال اعتراض باید در محدوده (0، 1) باشد.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("اعتراض تصادفی رخ داد")
			}
			return h(message)
		}
	}
}

بازیابی‌کننده

// RecoveredPanicError شامل خطای اعتراض بازیافت شده و اطلاعات ردیابی پشته است.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer هر اعتراض از روندگارنده را بازیابی می‌کند و RecoveredPanicError را به هر خطایی که از روندگارنده برگردانده شود، الحاق می‌کند.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

تلاش مجدد

// Retry یک middleware ارائه می دهد که در صورت برگرداندن خطا، تلاش مجدد برای اجرای handler را انجام می دهد.
// رفتار تلاش مجدد، تأخیر نمایی نمایی دوتایی و حداکثر زمان سپری شده قابل پیکربندی است.
type Retry struct {
	// MaxRetries حداکثر تعداد تلاش های قابل انجام است.
	MaxRetries int

	// InitialInterval فاصله ابتدایی بین تلاش ها است. فواصل بعدی توسط ضریب مقیاس داده خواهند شد.
	InitialInterval time.Duration
	// MaxInterval حداکثر حد بالایی برای تأخیر نمایی دوتایی تلاش ها را تنظیم می کند.
	MaxInterval time.Duration
	// Multiplier عاملی است که توسط آن فاصله انتظار بین تلاش ها ضرب می شود.
	Multiplier float64
	// MaxElapsedTime حداکثر زمان مجاز برای تلاش ها را تعیین می کند. اگر صفر باشد، غیرفعال خواهد بود.
	MaxElapsedTime time.Duration
	// RandomizationFactor به صورت تصادفی زمان تأخیر داده شده را در محدوده زیر پخش می دهد:
	// [فاصله_جاری * (1 - عامل_تصادفی), فاصله_جاری * (1 + عامل_تصادفی)].
	RandomizationFactor float64

	// OnRetryHook یک تابع اختیاری است که بر روی هر تلاش تکراری اجرا خواهد شد.
	// شماره تکرار تلاش فعلی از طریق retryNum گذاشته می شود.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware میان افزار Retry را بازمی گرداند.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

کنترل پهنای باند

// Throttle یک middleware ارائه می دهد برای محدود کردن تعداد پیام های پردازش شده در یک دوره زمانی خاص.
// این می تواند برای جلوگیری از بارگذاری بیش از حد handler های در حال اجرا در یک صف بلند باترتیب استفاده شود.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle یک middleware کنترل پهنای باند جدید ایجاد می کند.
// مثال مدت زمان و تعداد: NewThrottle(10, time.Second) بیان می کند 10 پیام در هر ثانیه.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware میان افزار Throttle را بازمی گرداند.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// کنترل پهنای باند هایی که توسط چندین handler به اشتراک گذاشته می شوند منتظر "تیک" های آنها خواهند ماند.

زمان سپری

// Timeout پس از مدت زمان مشخص، محتوای ورودی پیام را نابود می کند.
// هرگونه قابلیت حساس به زمان از پردازنده باید گوش دادن به msg.Context().Done() را داشته باشد تا بداند که موقعیت راه اندازی را کی انجام دهد.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}