บทนำ

Middleware ใช้สำหรับขยายเฟรมเวิร์กเหตุการณ์ ให้ความสามารถที่กำหนดเอง และให้ความสามารถที่สำคัญที่ไม่เกี่ยวข้องกับตรรกะของตัวจัดการหลัก ตัวอย่างเช่น ลองทำการเรียกใช้ตัวจัดการอีกครั้งหลังจากที่ได้รับข้อผิดพลาด หรือกู้คืนจากการขยะและจับทรายไว้ในการจัดการ

ลายเซนเจอร์ของฟังก์ชัน middleware นิยามอย่างไรบางวิธีเช่น: กรุณาดูเพิ่มเติมข้อมูลใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware ช่วยให้เราเขียน decorators ที่คล้ายกับ handler
// มันสามารถดำเนินการบางอย่างก่อน handler (ตัวอย่างเช่น แก้ไขข้อความที่ถูกบริโภค)
// และดำเนินการบางอย่างหลังจาก handler (แก้ไขข้อความที่จะผลิต ตอบรับ/ไม่ตอบรับข้อความที่ถูกบริโภค จัดการข้อผิดพลาด การบันทึก ฯลฯ)
//
// สามารถเชื่อต่อกับ router โดยใช้เมธอด `AddMiddleware`
//
// ตัวอย่าง:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("ก่อนที่จะดำเนินการตัวจัดการ")
//			ข้อความที่ผลิต, ข้อผิดพลาด := h(message)
//			fmt.Println("หลังที่ดำเนินการตัวจัดการ")
//
//			กำหนดข้อความที่ผลิต, ข้อผิดพลาด
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

วิธีใช้

มิดเดิลแวร์สามารถนำไปใช้กับทุกฮาน์เดิลในเราเตอร์หรือกับฮาน์เดิลที่เฉพาะเจาะจงได้ ขณะที่มิดเดิลถูกเพิ่มโดยตรงเข้าไปในเราเตอร์ มันจะถูกนำไปใช้กับทุกฮาน์เดิลที่หรือได้รับการจัดพาเจอากของเราเตอร์ ถ้ามิดเดิลถูกนำไปใช้กับฮาน์เดิลที่เฉพาะเจาะจง มันจะต้องถูกเพิ่มเข้าไปในฮาน์เดิลภายในเราเตอร์

นี่คือตัวอย่างของการใช้งาน:

รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// เมื่อได้รับสัญญาณ SIGTERM โดยการใช้ SignalsHandler จะทำการปิดเราเตอร์อย่างสุภาพ
	// คุณยังสามารถปิดเราเตอร์ได้โดยการเรียก `r.Close()`
	router.AddPlugin(plugin.SignalsHandler)

	// มิดเดิลระดับเราเตอร์จะถูกทำงานกับทุกข้อความที่ถูกส่งไปยังเราเตอร์
	router.AddMiddleware(
		// CorrelationID จะคัดลอก correlation ID จาก metadata ของข้อความที่เข้ามาไปยังข้อความที่ถูกสร้างขึ้น
		middleware.CorrelationID,

		// หากฮาน์เดิลส่งคืนข้อผิดพลาด มันจะถูกรีไทร์
		// มันจะถูกรีไทร์ได้มากสุด MaxRetries ครั้ง หลังจากนั้นข้อความนั้นจะถูก Nacked และจะถูกส่งใหม่โดย PubSub
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer จัดการกับการพยายามในฮาน์เดิล
		// ในกรณีนี้ มันจะนำมันไปเป็นข้อผิดพลาดให้กับมิดเดิลรีไทร์
		middleware.Recoverer,
	)

	// เพื่อความง่าย เราใช้ gochannel Pub/Sub ที่นี่
	// คุณสามารถแทนที่ด้วยการปฏิบัติตามใด ๆ ของการปฏิบัติตาม Pub/Sub และมันจะทำงานอย่างเดียวกัน
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// เผยแพร่ข้อความเข้ามาบางส่วนในพื้นหลัง
	go publishMessages(pubSub)

	// AddHandler คืนฮาน์เดิลที่สามารถนำไปใช้เพิ่มมิดเดิลที่ระดับฮาน์เดิล หรือหยุดฮาน์เดิล
	handler := router.AddHandler(
		"struct_handler",          // ชื่อของฮาน์เดิล จะต้องเป็นเอกลักษณ์
		"incoming_messages_topic", // หัวข้อที่เหตุการณ์จะถูกอ่านเข้ามา
		pubSub,
		"outgoing_messages_topic", // หัวข้อที่เหตุการณ์จะถูกเผยแพร่
		pubSub,
		structHandler{}.Handler,
	)

	// มิดเดิลระดับฮาน์เดิลจะถูกทำงานเฉพาะสำหรับฮาน์เดิลที่เฉพาะเจาะจง
	// มิดเดิลเช่นนี้สามารถถูกเพิ่มไปยังฮาน์เดิลได้โดยวิธีเดียวกันกับมิดเดิลระดับเราเตอร์
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("การทำมิดเดิลเฉพาะสำหรับฮาน์เดิลกำลังดำเนินการสำหรับ", message.UUID)

			return h(message)
		}
	})

	// เพียงเพื่อวัตถุประสงค์ในการดีบักเท่านั้น เราพิมพ์ข้อความทั้งหมดที่ได้รับไปยัง `incoming_messages_topic`
	router.AddNoPublisherHandler(
		"พิมพ์ข้อความที่เข้ามา",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// เพียงเพื่อวัตถุประสงค์ในการดีบักเท่านั้น เราพิมพ์เหตุการณ์ทั้งหมดที่ส่งไปยัง `outgoing_messages_topic`
	router.AddNoPublisherHandler(
		"พิมพ์เหตุการณ์ที่ส่งออก",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// ตอนนี้หลังจากที่มีการลงทะเบียนฮาน์เดิลทั้งหมดแล้ว เราสามารถเรียกเราเตอร์ได้
	// การเรียกจะบล็อคจนกว่าเราเตอร์จะหยุดทำงาน
// ...

มิดเดิลที่มีพร้อมให้ใช้งาน

นี่คือมิดเดิลที่สามารถนำมาใช้ใหม่โดย Watermill และคุณยังสามารถสร้างมิดเดิลของคุณเองได้ง่าย ๆ ตัวอย่างเช่น หากคุณต้องการเก็บข้อความที่เข้ามาแต่ละอันในรูปแบบบางแบบของบันทึกนี่คือวิธีที่ดีที่สุดที่จะทำให้มันเป็นไปได้

การตัดสินใจแบบเบรกเกอร์

// CircuitBreaker เป็น middleware ที่ห่อหุ้ม handler ด้วยเบรกเกอร์
// โดยอิงจากการกำหนดค่างานว่าเบรกเกอร์จะทำงานแบบ fast fail หาก handler ทำงานให้เกิด error ต่อเนื่อง
// มีประโยชน์ในการป้องกันการลาดตะเทือนที่กัน
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker สร้าง middleware CircuitBreaker ใหม่
// สำหรับการตั้งค่าที่ใช้สามารถอ้างอิงได้จากเอกสารของ gobreaker
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware คืนค่า CircuitBreaker middleware
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

การสัมพันธ์

// SetCorrelationID ตั้งค่า correlation ID สำหรับข้อความ
//
// เมื่อข้อความเข้าสู่ระบบ SetCorrelationID ควรถูกเรียก
// เมื่อมีการสร้างข้อความในคำขอ (เช่น HTTP) correlation ID ของข้อความควรเหมือนกับ correlation ID ของคำขอ
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID คืนค่า correlation ID จากข้อความ
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID เพิ่ม correlation ID ให้กับข้อความทั้งหมดที่สร้างโดย handler
// รหัสความจุของ ID ขึ้นกับ ID ข้อความที่ได้รับจาก handler
//
// เพื่อให้ CorrelationID ทำงานถูกต้อง SetCorrelationID ต้องถูกเรียกก่อนสำหรับข้อความที่เข้าสู่ระบบ
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

การทำซ้ำ

// Duplicator ประมวลผลข้อความสองครั้งเพื่อให้แน่ใจว่าจุดปลายทางเป็นฟังก์ชันไม่เช่นทำ
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

การละเว้นข้อผิดพลาด

// IgnoreErrors ให้ middleware ที่เรียกรับจาก handler มองเห็น error บางรายการที่ถูกกำหนดไว้โดยชัดเจน
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors สร้าง middleware IgnoreErrors ใหม่
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware คืนค่า IgnoreErrors middleware
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

การยอมรับโดยทันที

// InstantAck ทำให้ handler ยอมรับข้อความที่เข้ามาทันที โดยไม่คำนึงถึงข้อผิดพลาด
// สามารถใช้เพื่อปรับปรุงประสิทธิภาพ แต่การแลกเปลี่ยนคือ:
// หากคุณต้องการให้มั่นใจในการส่งมอบอย่างแท้จริง คุณอาจได้มากกว่าครั้งใด
// หากคุณต้องการข้อความที่มีลำดับ อาจทำให้ลำดับเปลี่ยนการเปลี่ยนลำดับ
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

ยาพิษ

// PoisonQueue ให้คุณสามารถใช้ middleware เพื่อจัดการข้อความที่ไม่สามารถประมวลผลได้ และเผยแพร่ข้อความเหล่านั้นไปยังหัวข้อที่แตกต่าง
// จากนั้น middleware หลักจะต่อการทำงานต่อไปเช่นเดิม
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter เหมือนกับ PoisonQueue แต่ยังรับฟังก์ชันเพื่อกำหนดเงื่อนไขของข้อผิดพลาดที่เหมาะสมกับคิวพิษ
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

ความล้มเหลวแบบสุ่ม

// RandomFail ทำให้ handler ล้มเหลวตามความน่าจะเป็นที่สุ่มได้ ความน่าจะเป็นของข้อผิดพลาดควรอยู่ในช่วง (0, 1)
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("เกิดข้อผิดพลาดแบบสุ่ม")
			}
			return h(message)
		}
	}
}

// RandomPanic ทำให้ handler สร้างการขัดข้องตามความน่าจะเป็นที่สุ่มได้ ความน่าจะเป็นของการขัดข้องควรอยู่ในช่วง (0, 1)
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("เกิดการขัดข้องแบบสุ่ม")
			}
			return h(message)
		}
	}
}

Recoverer

// RecoveredPanicError เก็บข้อผิดพลาดที่กลับมาจากการขัดข้องและข้อมูล stack trace ของมัน
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer กำหนดให้ผู้ใช้ middleware เพื่อกู้คืนการขัดข้องจาก handler และแนบ RecoveredPanicError พร้อมกับ stack trace ไปยังข้อผิดพลาดใด ๆ ที่ส่งกลับมาจาก handler
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

ลองใหม่

// Retry เป็นการให้ middleware ที่ลองใหม่การดำเนินการหากมีข้อผิดพลาดที่ถูกส่งกลับมา
// พฤติกรรมการลองใหม่ การย้อนกลับที่เพิ่มขึ้น เวลาที่ผ่านไปสูงสุดสามารถกำหนดค่าได้
type Retry struct {
	// MaxRetries เป็นจำนวนครั้งสูงสุดที่จะลอง
	MaxRetries int

	// InitialInterval เป็นช่วงเวลาเริ่มต้นระหว่างการลองใหม่ ช่วงเวลาต่อไปจะถูกปรับขนาดโดย Multiplier
	InitialInterval time.Duration
	// MaxInterval กำหนดขีดจำกัดบนสำหรับการย้อนกลับโดยกำหนดเวลาสูงสุด
	MaxInterval time.Duration
	// Multiplier เป็นตัวคูณที่ซึ่งช่วงรอระหว่างการลองใหม่จะถูกคูณ
	Multiplier float64
	// MaxElapsedTime กำหนดขีดจำกัดเวลาสูงสุดสำหรับการลองใหม่ ถ้าเป็น 0 จะไม่ใช้งาน
	MaxElapsedTime time.Duration
	// RandomizationFactor กระจายเวลารอในช่วงดังต่อไปนี้
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook เป็นฟังก์ชันที่ไม่บังคับที่จะทำการดำเนินการทุกครั้งที่ลองใหม่
	// จำนวนครั้งของการลองใหม่ถูกส่งผ่าน retryNum
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware ส่งคืน Retry middleware
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

การจำกัด

// Throttle เป็นการให้ middleware ที่จำกัดจำนวนข้อความที่ถูกระบบประมวลผลภายในระยะเวลาที่กำหนด
// นี้สามารถใช้เพื่อป้องกันการเฝ้ารอคิวที่ยาวและทิ้งไม้ที่ทำงานบนตัวจัดการ
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle สร้าง Throttle middleware ใหม่
// ตัวอย่าง ระยะเวลา และ จำนวน: NewThrottle(10, time.Second) หมายถึง 10 ข้อความต่อวินาที
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware ส่งคืน Throttle middleware
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Throttles shared by multiple handlers will wait for their "ticks".

การหมดเวลา

// การหมดเวลา ยกเลิกข่อความที่ถูกส่งมาจาก context ที่กำหนดหลังจากระยะเวลาที่ระบุ
// ฟังก์ชันที่อาจต้องการการหมดเวลาของตัวจัดการควรรับฟังต์ชัน msg.Context().Done() เพื่อทราบเมื่อมีความผิดพลาด
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}