Giới thiệu
Middleware được sử dụng để mở rộng framework sự kiện, thêm chức năng tùy chỉnh và cung cấp các chức năng quan trọng không liên quan đến logic của trình xử lý chính. Ví dụ, thử lại trình xử lý sau khi trả về một lỗi, hoặc khôi phục sau khi xảy ra sự cố và chụp đám mây trong trình xử lý.
Chữ ký chức năng middleware được định nghĩa như sau:
Full source code: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware cho phép chúng ta viết các decorator tương tự như trình xử lý.
// Nó có thể thực hiện một số hoạt động trước trình xử lý (ví dụ: sửa đổi thông điệp được tiêu thụ)
// và cũng thực hiện một số hoạt động sau trình xử lý (sửa đổi thông điệp được sản xuất, ACK/NACK thông điệp được tiêu thụ, xử lý lỗi, logging, v.v.).
//
// Nó có thể được gắn vào bộ định tuyến bằng cách sử dụng phương thức `AddMiddleware`.
//
// Ví dụ:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Trước khi thực thi trình xử lý")
// producedMessages, err := h(message)
// fmt.Println("Sau khi thực thi trình xử lý")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Sử dụng
Middleware có thể được áp dụng cho tất cả các xử lý trong bộ định tuyến hoặc cho các xử lý cụ thể. Khi middleware được thêm trực tiếp vào bộ định tuyến, nó sẽ được áp dụng cho tất cả các xử lý được cung cấp cho bộ định tuyến. Nếu một middleware chỉ được áp dụng cho một xử lý cụ thể, nó cần được thêm vào xử lý trong bộ định tuyến.
Dưới đây là một ví dụ về cách sử dụng:
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Khi nhận được tín hiệu SIGTERM, SignalsHandler sẽ đóng bộ định tuyến một cách trơn tru.
// Bạn cũng có thể đóng bộ định tuyến bằng cách gọi `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Middleware cấp độ bộ định tuyến sẽ được thực thi trên mỗi thông điệp được gửi đến bộ định tuyến
router.AddMiddleware(
// CorrelationID sẽ sao chép ID tương quan từ siêu dữ liệu thông điệp đến thông điệp được tạo ra
middleware.CorrelationID,
// Nếu xử lý trả về một lỗi, nó sẽ được thử lại.
// Nó sẽ được thử lại tối đa MaxRetries lần, sau đó thông điệp sẽ bị Nacked và được gửi lại bởi PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer xử lý sự cố trong xử lý.
// Trong trường hợp này, nó chuyển chúng thành lỗi cho middleware Retry.
middleware.Recoverer,
)
// Vì sự đơn giản, chúng ta sử dụng gochannel Pub/Sub ở đây,
// bạn có thể thay thế nó bằng bất kỳ triển khai Pub/Sub nào khác và nó sẽ vẫn hoạt động tương tự.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Đăng các thông điệp đến đến nền
go publishMessages(pubSub)
// AddHandler trả về một xử lý có thể được sử dụng để thêm middleware cấp độ xử lý
// hoặc để dừng xử lý.
handler := router.AddHandler(
"struct_handler", // Tên xử lý, phải là duy nhất
"incoming_messages_topic", // Chủ đề mà từ đó sự kiện sẽ được đọc
pubSub,
"outgoing_messages_topic", // Chủ đề mà sự kiện sẽ được xuất bản
pubSub,
structHandler{}.Handler,
)
// Middleware cấp độ xử lý chỉ được thực thi cho các xử lý cụ thể
// Middleware như vậy có thể được thêm vào xử lý theo cách tương tự như middleware cấp độ bộ định tuyến
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Thực thi middleware cụ thể cho trình xử lý", message.UUID)
return h(message)
}
})
// Chỉ cho mục đích gỡ lỗi, chúng ta in tất cả các thông điệp nhận được trên `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Chỉ cho mục đích gỡ lỗi, chúng ta in tất cả sự kiện gửi đến `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Bây giờ khi tất cả các xử lý đã được đăng ký, chúng ta có thể chạy bộ định tuyến.
// Hàm Run sẽ chặn cho đến khi bộ định tuyến dừng chạy.
// ...
Middleware Có Sẵn
Dưới đây là các middleware có thể tái sử dụng do Watermill cung cấp và bạn cũng có thể dễ dàng triển khai middleware của riêng bạn. Ví dụ, nếu bạn muốn lưu trữ mỗi thông điệp đến trong một định dạng log loại nhất định, đây là cách tốt nhất để làm điều đó.
Circuit Breaker
// CircuitBreaker là một middleware bao bọc handler trong một máy cắt mạch.
// Dựa trên cấu hình, máy cắt mạch sẽ nhanh chóng thất bại nếu handler tiếp tục trả về lỗi.
// Điều này hữu ích để ngăn chặn sự thất bại lan truyền.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker trả về một middleware CircuitBreaker mới.
// Đối với cài đặt có sẵn, vui lòng tham khảo tài liệu gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware trả về middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Correlation
// SetCorrelationID thiết lập ID tương quan cho tin nhắn.
//
// Khi một tin nhắn nhập hệ thống, hãy gọi SetCorrelationID.
// Khi một tin nhắn được tạo ra trong một yêu cầu (ví dụ như HTTP), correlation ID của tin nhắn nên giống như correlation ID của yêu cầu.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID trả về correlation ID từ tin nhắn.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID thêm một correlation ID vào tất cả các tin nhắn được tạo ra bởi handler.
// ID dựa trên message ID nhận được bởi handler.
//
// Để CorrelationID hoạt động đúng, cần gọi SetCorrelationID trước tiên để tin nhắn nhập hệ thống.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Duplicator
// Duplicator xử lý tin nhắn hai lần để đảm bảo điểm cuối là idempotent.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Ignore Errors
// IgnoreErrors cung cấp một middleware cho phép handler bỏ qua một số lỗi được xác định một cách rõ ràng.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors tạo một middleware IgnoreErrors mới.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware trả về middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Instant Ack
// InstantAck khiến handler ngay lập tức xác nhận tin nhắn đầu vào, bất chấp bất kỳ lỗi nào.
// Nó có thể được sử dụng để cải thiện thông lượng, nhưng phải đánh đổi là:
// Nếu bạn cần đảm bảo giao hàng chính xác một lần, bạn có thể nhận được ít nhất một lần giao hàng.
// Nếu bạn yêu cầu tin nhắn có thứ tự, nó có thể phá vỡ thứ tự.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Độc hại
// PoisonQueue cung cấp một tính năng middleware để xử lý các thông điệp không thể xử lý và gửi chúng đến một chủ đề riêng biệt.
// Sau đó, chuỗi middleware chính tiếp tục thực thi, và doanh nghiệp tiếp tục như thông thường.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter tương tự như PoisonQueue, nhưng chấp nhận một hàm để xác định lỗi nào đáp ứng tiêu chí hàng đợi độc hại.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Lỗi Ngẫu Nhiên
// RandomFail làm cho trình xử lý thất bại dựa trên xác suất ngẫu nhiên. Xác suất lỗi phải nằm trong khoảng (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("đã xảy ra lỗi ngẫu nhiên")
}
return h(message)
}
}
}
// RandomPanic làm cho trình xử lý gây ra sự kinh hoàng dựa trên xác suất ngẫu nhiên. Xác suất kinh hoàng phải nằm trong khoảng (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("đã xảy ra sự kinh hoàng ngẫu nhiên")
}
return h(message)
}
}
}
Người phục hồi
// RecoveredPanicError chứa lỗi kinh hoàng đã phục hồi và thông tin vết tích ngăn xếp của nó.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer phục hồi bất kỳ sự kinh hoàng từ trình xử lý và gán RecoveredPanicError với thông tin vết tích ngăn xếp vào bất kỳ lỗi nào được trả về từ trình xử lý.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Thử lại
// Retry cung cấp một middleware để thử lại xử lý nếu có lỗi trả về.
// Hành vi thử lại, đợi lệch mũi tên và thời gian tối đa có thể được cấu hình.
type Retry struct {
// MaxRetries là số lần thử lại tối đa được thực hiện.
MaxRetries int
// InitialInterval là khoảng thời gian ban đầu giữa các lần thử lại. Các khoảng thời gian sau sẽ được nhân với Multiplier.
InitialInterval time.Duration
// MaxInterval thiết lập giới hạn trên cho đợi lệch mũi tên theo cấp số nhân.
MaxInterval time.Duration
// Multiplier là hệ số mà khoảng thời gian chờ đợi giữa các lần thử lại sẽ được nhân lên.
Multiplier float64
// MaxElapsedTime thiết lập thời gian tối đa cho việc thử lại. Nếu là 0, nó sẽ bị vô hiệu hóa.
MaxElapsedTime time.Duration
// RandomizationFactor ngẫu nhiên lan trải thời gian lệch mũi tên trong phạm vi sau:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook là một hàm tùy chọn được thực thi trên mỗi lần thử lại.
// Số lần thử lại hiện tại được truyền qua retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware trả về middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Giảm tốc
// Throttle cung cấp một middleware để giới hạn số lượng tin nhắn được xử lý trong khoảng thời gian nhất định.
// Điều này có thể được sử dụng để ngăn chặn việc quá tải cho các xử lý đang chạy trên hàng đợi chưa được xử lý.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle tạo một middleware Throttle mới.
// Ví dụ về khoảng thời gian và số lượng: NewThrottle(10, time.Second) cho biết 10 tin nhắn mỗi giây.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware trả về middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Các giảm tốc được chia sẻ bởi nhiều xử lý viên sẽ chờ đợi cho "tick" của họ.
Hết thời gian
// Timeout hủy bỏ bộ ngữ cảnh tin nhắn đầu vào sau khoảng thời gian được chỉ định.
// Bất kỳ chức năng nhạy cảm với thời gian chờ đợi của xử lý viên nên lắng nghe msg.Context().Done() để biết khi nào phải thất bại.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}