บทนำ
Middleware ใช้สำหรับขยายเฟรมเวิร์กเหตุการณ์ ให้ความสามารถที่กำหนดเอง และให้ความสามารถที่สำคัญที่ไม่เกี่ยวข้องกับตรรกะของตัวจัดการหลัก ตัวอย่างเช่น ลองทำการเรียกใช้ตัวจัดการอีกครั้งหลังจากที่ได้รับข้อผิดพลาด หรือกู้คืนจากการขยะและจับทรายไว้ในการจัดการ
ลายเซนเจอร์ของฟังก์ชัน middleware นิยามอย่างไรบางวิธีเช่น: กรุณาดูเพิ่มเติมข้อมูลใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware ช่วยให้เราเขียน decorators ที่คล้ายกับ handler
// มันสามารถดำเนินการบางอย่างก่อน handler (ตัวอย่างเช่น แก้ไขข้อความที่ถูกบริโภค)
// และดำเนินการบางอย่างหลังจาก handler (แก้ไขข้อความที่จะผลิต ตอบรับ/ไม่ตอบรับข้อความที่ถูกบริโภค จัดการข้อผิดพลาด การบันทึก ฯลฯ)
//
// สามารถเชื่อต่อกับ router โดยใช้เมธอด `AddMiddleware`
//
// ตัวอย่าง:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("ก่อนที่จะดำเนินการตัวจัดการ")
// ข้อความที่ผลิต, ข้อผิดพลาด := h(message)
// fmt.Println("หลังที่ดำเนินการตัวจัดการ")
//
// กำหนดข้อความที่ผลิต, ข้อผิดพลาด
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
วิธีใช้
มิดเดิลแวร์สามารถนำไปใช้กับทุกฮาน์เดิลในเราเตอร์หรือกับฮาน์เดิลที่เฉพาะเจาะจงได้ ขณะที่มิดเดิลถูกเพิ่มโดยตรงเข้าไปในเราเตอร์ มันจะถูกนำไปใช้กับทุกฮาน์เดิลที่หรือได้รับการจัดพาเจอากของเราเตอร์ ถ้ามิดเดิลถูกนำไปใช้กับฮาน์เดิลที่เฉพาะเจาะจง มันจะต้องถูกเพิ่มเข้าไปในฮาน์เดิลภายในเราเตอร์
นี่คือตัวอย่างของการใช้งาน:
รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// เมื่อได้รับสัญญาณ SIGTERM โดยการใช้ SignalsHandler จะทำการปิดเราเตอร์อย่างสุภาพ
// คุณยังสามารถปิดเราเตอร์ได้โดยการเรียก `r.Close()`
router.AddPlugin(plugin.SignalsHandler)
// มิดเดิลระดับเราเตอร์จะถูกทำงานกับทุกข้อความที่ถูกส่งไปยังเราเตอร์
router.AddMiddleware(
// CorrelationID จะคัดลอก correlation ID จาก metadata ของข้อความที่เข้ามาไปยังข้อความที่ถูกสร้างขึ้น
middleware.CorrelationID,
// หากฮาน์เดิลส่งคืนข้อผิดพลาด มันจะถูกรีไทร์
// มันจะถูกรีไทร์ได้มากสุด MaxRetries ครั้ง หลังจากนั้นข้อความนั้นจะถูก Nacked และจะถูกส่งใหม่โดย PubSub
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer จัดการกับการพยายามในฮาน์เดิล
// ในกรณีนี้ มันจะนำมันไปเป็นข้อผิดพลาดให้กับมิดเดิลรีไทร์
middleware.Recoverer,
)
// เพื่อความง่าย เราใช้ gochannel Pub/Sub ที่นี่
// คุณสามารถแทนที่ด้วยการปฏิบัติตามใด ๆ ของการปฏิบัติตาม Pub/Sub และมันจะทำงานอย่างเดียวกัน
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// เผยแพร่ข้อความเข้ามาบางส่วนในพื้นหลัง
go publishMessages(pubSub)
// AddHandler คืนฮาน์เดิลที่สามารถนำไปใช้เพิ่มมิดเดิลที่ระดับฮาน์เดิล หรือหยุดฮาน์เดิล
handler := router.AddHandler(
"struct_handler", // ชื่อของฮาน์เดิล จะต้องเป็นเอกลักษณ์
"incoming_messages_topic", // หัวข้อที่เหตุการณ์จะถูกอ่านเข้ามา
pubSub,
"outgoing_messages_topic", // หัวข้อที่เหตุการณ์จะถูกเผยแพร่
pubSub,
structHandler{}.Handler,
)
// มิดเดิลระดับฮาน์เดิลจะถูกทำงานเฉพาะสำหรับฮาน์เดิลที่เฉพาะเจาะจง
// มิดเดิลเช่นนี้สามารถถูกเพิ่มไปยังฮาน์เดิลได้โดยวิธีเดียวกันกับมิดเดิลระดับเราเตอร์
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("การทำมิดเดิลเฉพาะสำหรับฮาน์เดิลกำลังดำเนินการสำหรับ", message.UUID)
return h(message)
}
})
// เพียงเพื่อวัตถุประสงค์ในการดีบักเท่านั้น เราพิมพ์ข้อความทั้งหมดที่ได้รับไปยัง `incoming_messages_topic`
router.AddNoPublisherHandler(
"พิมพ์ข้อความที่เข้ามา",
"incoming_messages_topic",
pubSub,
printMessages,
)
// เพียงเพื่อวัตถุประสงค์ในการดีบักเท่านั้น เราพิมพ์เหตุการณ์ทั้งหมดที่ส่งไปยัง `outgoing_messages_topic`
router.AddNoPublisherHandler(
"พิมพ์เหตุการณ์ที่ส่งออก",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// ตอนนี้หลังจากที่มีการลงทะเบียนฮาน์เดิลทั้งหมดแล้ว เราสามารถเรียกเราเตอร์ได้
// การเรียกจะบล็อคจนกว่าเราเตอร์จะหยุดทำงาน
// ...
มิดเดิลที่มีพร้อมให้ใช้งาน
นี่คือมิดเดิลที่สามารถนำมาใช้ใหม่โดย Watermill และคุณยังสามารถสร้างมิดเดิลของคุณเองได้ง่าย ๆ ตัวอย่างเช่น หากคุณต้องการเก็บข้อความที่เข้ามาแต่ละอันในรูปแบบบางแบบของบันทึกนี่คือวิธีที่ดีที่สุดที่จะทำให้มันเป็นไปได้
การตัดสินใจแบบเบรกเกอร์
// CircuitBreaker เป็น middleware ที่ห่อหุ้ม handler ด้วยเบรกเกอร์
// โดยอิงจากการกำหนดค่างานว่าเบรกเกอร์จะทำงานแบบ fast fail หาก handler ทำงานให้เกิด error ต่อเนื่อง
// มีประโยชน์ในการป้องกันการลาดตะเทือนที่กัน
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker สร้าง middleware CircuitBreaker ใหม่
// สำหรับการตั้งค่าที่ใช้สามารถอ้างอิงได้จากเอกสารของ gobreaker
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware คืนค่า CircuitBreaker middleware
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
การสัมพันธ์
// SetCorrelationID ตั้งค่า correlation ID สำหรับข้อความ
//
// เมื่อข้อความเข้าสู่ระบบ SetCorrelationID ควรถูกเรียก
// เมื่อมีการสร้างข้อความในคำขอ (เช่น HTTP) correlation ID ของข้อความควรเหมือนกับ correlation ID ของคำขอ
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID คืนค่า correlation ID จากข้อความ
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID เพิ่ม correlation ID ให้กับข้อความทั้งหมดที่สร้างโดย handler
// รหัสความจุของ ID ขึ้นกับ ID ข้อความที่ได้รับจาก handler
//
// เพื่อให้ CorrelationID ทำงานถูกต้อง SetCorrelationID ต้องถูกเรียกก่อนสำหรับข้อความที่เข้าสู่ระบบ
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
การทำซ้ำ
// Duplicator ประมวลผลข้อความสองครั้งเพื่อให้แน่ใจว่าจุดปลายทางเป็นฟังก์ชันไม่เช่นทำ
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
การละเว้นข้อผิดพลาด
// IgnoreErrors ให้ middleware ที่เรียกรับจาก handler มองเห็น error บางรายการที่ถูกกำหนดไว้โดยชัดเจน
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors สร้าง middleware IgnoreErrors ใหม่
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware คืนค่า IgnoreErrors middleware
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
การยอมรับโดยทันที
// InstantAck ทำให้ handler ยอมรับข้อความที่เข้ามาทันที โดยไม่คำนึงถึงข้อผิดพลาด
// สามารถใช้เพื่อปรับปรุงประสิทธิภาพ แต่การแลกเปลี่ยนคือ:
// หากคุณต้องการให้มั่นใจในการส่งมอบอย่างแท้จริง คุณอาจได้มากกว่าครั้งใด
// หากคุณต้องการข้อความที่มีลำดับ อาจทำให้ลำดับเปลี่ยนการเปลี่ยนลำดับ
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
ยาพิษ
// PoisonQueue ให้คุณสามารถใช้ middleware เพื่อจัดการข้อความที่ไม่สามารถประมวลผลได้ และเผยแพร่ข้อความเหล่านั้นไปยังหัวข้อที่แตกต่าง
// จากนั้น middleware หลักจะต่อการทำงานต่อไปเช่นเดิม
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter เหมือนกับ PoisonQueue แต่ยังรับฟังก์ชันเพื่อกำหนดเงื่อนไขของข้อผิดพลาดที่เหมาะสมกับคิวพิษ
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
ความล้มเหลวแบบสุ่ม
// RandomFail ทำให้ handler ล้มเหลวตามความน่าจะเป็นที่สุ่มได้ ความน่าจะเป็นของข้อผิดพลาดควรอยู่ในช่วง (0, 1)
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("เกิดข้อผิดพลาดแบบสุ่ม")
}
return h(message)
}
}
}
// RandomPanic ทำให้ handler สร้างการขัดข้องตามความน่าจะเป็นที่สุ่มได้ ความน่าจะเป็นของการขัดข้องควรอยู่ในช่วง (0, 1)
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("เกิดการขัดข้องแบบสุ่ม")
}
return h(message)
}
}
}
Recoverer
// RecoveredPanicError เก็บข้อผิดพลาดที่กลับมาจากการขัดข้องและข้อมูล stack trace ของมัน
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer กำหนดให้ผู้ใช้ middleware เพื่อกู้คืนการขัดข้องจาก handler และแนบ RecoveredPanicError พร้อมกับ stack trace ไปยังข้อผิดพลาดใด ๆ ที่ส่งกลับมาจาก handler
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
ลองใหม่
// Retry เป็นการให้ middleware ที่ลองใหม่การดำเนินการหากมีข้อผิดพลาดที่ถูกส่งกลับมา
// พฤติกรรมการลองใหม่ การย้อนกลับที่เพิ่มขึ้น เวลาที่ผ่านไปสูงสุดสามารถกำหนดค่าได้
type Retry struct {
// MaxRetries เป็นจำนวนครั้งสูงสุดที่จะลอง
MaxRetries int
// InitialInterval เป็นช่วงเวลาเริ่มต้นระหว่างการลองใหม่ ช่วงเวลาต่อไปจะถูกปรับขนาดโดย Multiplier
InitialInterval time.Duration
// MaxInterval กำหนดขีดจำกัดบนสำหรับการย้อนกลับโดยกำหนดเวลาสูงสุด
MaxInterval time.Duration
// Multiplier เป็นตัวคูณที่ซึ่งช่วงรอระหว่างการลองใหม่จะถูกคูณ
Multiplier float64
// MaxElapsedTime กำหนดขีดจำกัดเวลาสูงสุดสำหรับการลองใหม่ ถ้าเป็น 0 จะไม่ใช้งาน
MaxElapsedTime time.Duration
// RandomizationFactor กระจายเวลารอในช่วงดังต่อไปนี้
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook เป็นฟังก์ชันที่ไม่บังคับที่จะทำการดำเนินการทุกครั้งที่ลองใหม่
// จำนวนครั้งของการลองใหม่ถูกส่งผ่าน retryNum
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware ส่งคืน Retry middleware
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
การจำกัด
// Throttle เป็นการให้ middleware ที่จำกัดจำนวนข้อความที่ถูกระบบประมวลผลภายในระยะเวลาที่กำหนด
// นี้สามารถใช้เพื่อป้องกันการเฝ้ารอคิวที่ยาวและทิ้งไม้ที่ทำงานบนตัวจัดการ
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle สร้าง Throttle middleware ใหม่
// ตัวอย่าง ระยะเวลา และ จำนวน: NewThrottle(10, time.Second) หมายถึง 10 ข้อความต่อวินาที
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware ส่งคืน Throttle middleware
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Throttles shared by multiple handlers will wait for their "ticks".
การหมดเวลา
// การหมดเวลา ยกเลิกข่อความที่ถูกส่งมาจาก context ที่กำหนดหลังจากระยะเวลาที่ระบุ
// ฟังก์ชันที่อาจต้องการการหมดเวลาของตัวจัดการควรรับฟังต์ชัน msg.Context().Done() เพื่อทราบเมื่อมีความผิดพลาด
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}