مقدمه
میدلویر به کار میرود تا چارچوب رویداد را گسترش دهد، قابلیتهای سفارشی را اضافه کند، و قابلیتهای مهمی که به منطق دستور اصلی مربوط نیستند را فراهم کند. به عنوان مثال، تکرار اجرای دستور پس از بازگشت خطا، یا بازیابی از وقوع خطا و ضبط ردیابی استک در داخل دستور.
امضای تابع میدلویر به صورت زیر تعریف شده است:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware به ما امکان میدهد تا مانند دستوردهنده، دکوراتورهای مشابه دستور را بنویسیم.
// این میتواند برخی عملیات را قبل از دستور (مانند تغییر پیام مصرف شده) انجام دهد
// و همچنین برخی عملیات را بعد از دستور (تغییر پیام تولید شده، ACK/NACK پیام مصرفی، مدیریت خطا، ثبت وقایع و غیره) انجام دهد.
//
// میتوان آن را به روتر متصل کرد با استفاده از روش `AddMiddleware`.
//
// مثال:
//
// func ExampleMiddleware(h HandlerFunc) HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("قبل از اجرای دستور")
// producedMessages, err := h(message)
// fmt.Println("بعد از اجرای دستور")
//
// return producedMessages, err
// }
// }
نوع HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
استفاده
میان افزار میتواند بر روی تمام دستگیرهها در مسیریاب یا بر روی دستگیرههای خاص اعمال شود. وقتی میان افزار به صورت مستقیم به مسیریاب اضافه میشود، بر روی تمام دستگیرههای ارائه شده برای مسیریاب اعمال میشود. اگر یک میان افزار فقط بر روی یک دستگیره خاص اعمال شود، باید به دستگیره در مسیریاب اضافه شود.
یک مثال از استفاده:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// وقتی سیگنال SIGTERM دریافت میشود، SignalsHandler به صورت منظم مسیریاب را بسته خواهد کرد.
// همچنین میتوانید مسیریاب را با فراخوانی `r.Close()` نیز ببندید.
router.AddPlugin(plugin.SignalsHandler)
// میان افزار سطح مسیریاب بر روی هر پیام ارسالی به مسیریاب اجرا خواهد شد
router.AddMiddleware(
// CorrelationID کد همخوانی را از متاداده پیام ورودی به پیام ساخته شده کپی میکند
middleware.CorrelationID,
// در صورتی که دستگیره خطا بازگرداند، دوباره امتحان میکند.
// حداکثر تعداد امتحانها MaxRetries بار خواهد بود، بعد از این تعداد پیام Nacked خواهد شد و توسط PubSub مجدداً ارسال خواهد شد.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer با خطاهای در دستگیره برخورده میکند.
// در این حالت، آنها را به عنوان خطاها به میان افزار Retry منتقل میکند.
middleware.Recoverer,
)
// برای سادگی، در اینجا از ایمپلمنتاسیون gochannel Pub/Sub استفاده میکنیم
// میتوانید آن را با هر ایمپلمنتاسیون Pub/Sub دیگری جایگزین کنید و همچنان کار میکند.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// انتشار برخی از پیامهای ورودی به صورت پسزمینه
go publishMessages(pubSub)
// AddHandler یک دستگیره برمیگرداند که میتواند برای اضافه کردن میان افزار سطح دستگیره یا متوقف کردن دستگیره استفاده شود.
handler := router.AddHandler(
"struct_handler", // نام دستگیره، باید یکتا باشد
"incoming_messages_topic", // تاپیکی که رویدادها از آن خوانده خواهند شد
pubSub,
"outgoing_messages_topic", // تاپیکی که رویدادها در آن منتشر خواهند شد
pubSub,
structHandler{}.Handler,
)
// میان افزار سطح دستگیره فقط برای دستگیرههای خاص اجرا میشود
// اینگونه میان افزار میتواند به دستگیره به همان روشی که میان افزار سطح مسیریاب اضافه میشود اضافه شود
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("اجرای میان افزار خاص دستگیره برای", message.UUID)
return h(message)
}
})
// تنها برای اهداف اشکالزدایی، تمام پیامهای دریافتی در `incoming_messages_topic` را چاپ میکنیم
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// تنها برای اهداف اشکالزدایی، تمام رویدادهای ارسال شده به `outgoing_messages_topic` را چاپ میکنیم
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// حال که تمام دستگیرهها ثبت شدهاند، میتوانیم مسیریاب را اجرا کنیم.
// اجرا تا زمانی که مسیریاب دارد اجرا میشود، مسدود خواهد شد.
// ...
میان افزارهای قابل دسترس
اینجا میان افزارهای قابل استفاده دیگری که توسط Watermill فراهم شدهاند آورده شده است، و میتوانید به راحتی میان افزارهای خود را نیز پیاده سازی کنید. به عنوان مثال، اگر میخواهید هر پیام ورودی را در یک نوع خاص از فرمت لاگ ذخیره کنید، بهترین راه انجام این کار است.
قطاع مدار
// CircuitBreaker یک میانجی است که handler را در یک قطاع مدار میبندد.
// بر اساس پیکربندی، قطاع مدار در صورت ادامه بازگرداندن خطاها تلاش سریع برای شکستن عملکرد handler را انجام می دهد.
// این کار برای جلوگیری از فراگیری خطاها مفید است.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker یک میانجی CircuitBreaker جدید را برمیگرداند.
// برای تنظیمات موجود، لطفا به مستندات gobreaker مراجعه نمایید.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware میانجی CircuitBreaker را برمیگرداند.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
همبستگی
// SetCorrelationID برای پیام ID همبستگی را تعیین میکند.
//
// زمانی که یک پیام وارد سیستم می شود، باید SetCorrelationID فراخوانی شود.
// زمانی که یک پیام در یک درخواست تولید میشود (مانند HTTP)، ID همبستگی پیام باید مشابه ID همبستگی درخواست باشد.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID ID همبستگی را از پیام بازمی گرداند.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID ID همبستگی را به تمام پیامهای تولیدشده توسط handler اضافه می کند.
// این ID بر اساس پیام ID دریافتی توسط handler است.
//
// برای CorrelationID به درستی کار کند، ابتدا برای وارد شدن پیام به سیستم باید SetCorrelationID فراخوانی شود.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
تکثیرکننده
// Duplicator برای اطمینان از تکرارپذیری نقطهپایان، پیام را دوبار پردازش می کند.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
صرفنظر از خطاها
// IgnoreErrors یک میانجی ارائه می دهد که به handler اجازه می دهد برخی از خطاهای به طور صریح تعریف شده را نادیده بگیرد.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors یک میانجی IgnoreErrors جدید ایجاد میکند.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware میانجی IgnoreErrors را برمیگرداند.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
اقرارفوری
// InstantAck باعث میشود handler بلافاصله پیام ورودی را تایید کند، بدون توجه به هر گونه خطا.
// این میتواند برای بهبود ظرفیت استفاده شود، اما مقایسه آن:
// اگر شما نیاز به اطمینان از تحویل یکبار دقیق دارید، ممکن است حداقل یک بار تحویل بگیرید.
// اگر شما به پیامهای مرتب شده نیاز دارید، ممکن است ترتیب را بشکند.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
سم زهرآگین
// PoisonQueue یک ویژگی میانافزار فراهم میکند تا پیامهای ناقابل پردازش را اداره کرده و آنها را به یک موضوع جداگانه منتشر کند.
// سپس زنجیره میانافزار اصلی ادامه میدهد و فعالیت کسب و کار همانند قبل ادامه مییابد.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter مشابه PoisonQueue است، اما یک تابع را برای تعیین اشکالاتی که بافقدرت سم زهرآگین مطابقت دارند، میپذیرد.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
خرابی تصادفی
// RandomFail باعث شکست دهنده روندگارنده براساس احتمال تصادفی میشود. احتمال خطا باید در محدوده (0، 1) باشد.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("خطای تصادفی رخ داد")
}
return h(message)
}
}
}
// RandomPanic باعث اعتراض تصادفی روندگارنده براساس احتمال تصادفی میشود. احتمال اعتراض باید در محدوده (0، 1) باشد.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("اعتراض تصادفی رخ داد")
}
return h(message)
}
}
}
بازیابیکننده
// RecoveredPanicError شامل خطای اعتراض بازیافت شده و اطلاعات ردیابی پشته است.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer هر اعتراض از روندگارنده را بازیابی میکند و RecoveredPanicError را به هر خطایی که از روندگارنده برگردانده شود، الحاق میکند.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
تلاش مجدد
// Retry یک middleware ارائه می دهد که در صورت برگرداندن خطا، تلاش مجدد برای اجرای handler را انجام می دهد.
// رفتار تلاش مجدد، تأخیر نمایی نمایی دوتایی و حداکثر زمان سپری شده قابل پیکربندی است.
type Retry struct {
// MaxRetries حداکثر تعداد تلاش های قابل انجام است.
MaxRetries int
// InitialInterval فاصله ابتدایی بین تلاش ها است. فواصل بعدی توسط ضریب مقیاس داده خواهند شد.
InitialInterval time.Duration
// MaxInterval حداکثر حد بالایی برای تأخیر نمایی دوتایی تلاش ها را تنظیم می کند.
MaxInterval time.Duration
// Multiplier عاملی است که توسط آن فاصله انتظار بین تلاش ها ضرب می شود.
Multiplier float64
// MaxElapsedTime حداکثر زمان مجاز برای تلاش ها را تعیین می کند. اگر صفر باشد، غیرفعال خواهد بود.
MaxElapsedTime time.Duration
// RandomizationFactor به صورت تصادفی زمان تأخیر داده شده را در محدوده زیر پخش می دهد:
// [فاصله_جاری * (1 - عامل_تصادفی), فاصله_جاری * (1 + عامل_تصادفی)].
RandomizationFactor float64
// OnRetryHook یک تابع اختیاری است که بر روی هر تلاش تکراری اجرا خواهد شد.
// شماره تکرار تلاش فعلی از طریق retryNum گذاشته می شود.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware میان افزار Retry را بازمی گرداند.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
کنترل پهنای باند
// Throttle یک middleware ارائه می دهد برای محدود کردن تعداد پیام های پردازش شده در یک دوره زمانی خاص.
// این می تواند برای جلوگیری از بارگذاری بیش از حد handler های در حال اجرا در یک صف بلند باترتیب استفاده شود.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle یک middleware کنترل پهنای باند جدید ایجاد می کند.
// مثال مدت زمان و تعداد: NewThrottle(10, time.Second) بیان می کند 10 پیام در هر ثانیه.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware میان افزار Throttle را بازمی گرداند.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// کنترل پهنای باند هایی که توسط چندین handler به اشتراک گذاشته می شوند منتظر "تیک" های آنها خواهند ماند.
زمان سپری
// Timeout پس از مدت زمان مشخص، محتوای ورودی پیام را نابود می کند.
// هرگونه قابلیت حساس به زمان از پردازنده باید گوش دادن به msg.Context().Done() را داشته باشد تا بداند که موقعیت راه اندازی را کی انجام دهد.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}