Publisher và Subscriber là những phần cấp thấp của Watermill. Trong ứng dụng thực tế, bạn thường muốn sử dụng các giao diện và chức năng cấp cao, như kết nối, đánh giá, hàng đợi tin nhắn sai, thử lại, hạn chế tốc độ, v.v.
Đôi khi, bạn có thể không muốn gửi Ack khi xử lý thành công. Đôi khi, bạn có thể muốn gửi một tin nhắn sau khi tin nhắn khác được xử lý.
Để đáp ứng những yêu cầu này, có một thành phần gọi là Router.
Cấu hình
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout xác định thời gian mà router nên làm việc cho các bộ xử lý khi đóng.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate kiểm tra xem có bất kỳ lỗi nào trong cấu hình router không.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Xử lý viên
Trước hết, bạn cần triển khai hàm HandlerFunc
:
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc là hàm được gọi khi một thông điệp được nhận.
//
// Khi HandlerFunc không trả về lỗi, msg.Ack() sẽ tự động được gọi.
//
// Khi HandlerFunc trả về lỗi, msg.Nack() sẽ được gọi.
//
// Khi msg.Ack() được gọi trong hàndler và HandlerFunc trả về lỗi,
// msg.Nack() sẽ không được gửi vì Ack đã được gửi trước đó.
//
// Khi nhận nhiều thông điệp (do msg.Ack() được gửi trong HandlerFunc hoặc Subscriber hỗ trợ nhiều người tiêu dùng),
// HandlerFunc sẽ được thực thi song song.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Tiếp theo, bạn cần sử dụng Router.AddHandler
để thêm một xử lý viên mới:
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler thêm một xử lý viên mới.
// handlerName phải là duy nhất. Hiện tại, chỉ được sử dụng cho mục đích gỡ lỗi.
// subscribeTopic là chủ đề mà xử lý viên sẽ nhận thông điệp từ.
// publishTopic là chủ đề mà các thông điệp được trả về của xử lý viên sẽ được tạo bởi Router.
// Khi xử lý viên cần xuất bản đến nhiều chủ đề,
// được khuyến nghị chỉ tiêm cấp Publisher vào xử lý viên hoặc triển khai middleware,
// có thể bắt các thông điệp dựa trên siêu dữ liệu và xuất bản đến các chủ đề cụ thể.
// Nếu một xử lý viên được thêm khi router đang chạy, RunHandlers() cần được gọi một cách rõ ràng.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Thêm xử lý viên", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped không luôn đang chờ để handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler thêm một xử lý viên mới.
// Xử lý viên này không thể trả về thông điệp.
// Khi nó trả về một thông điệp, xảy ra lỗi và một Nack được gửi.
//
// handlerName phải là duy nhất. Hiện tại, chỉ được sử dụng cho mục đích gỡ lỗi.
// subscribeTopic là chủ đề mà xử lý viên sẽ nhận thông điệp từ.
// subscriber là người đăng ký được sử dụng để tiêu thụ thông điệp.
// Nếu một xử lý viên được thêm khi router đang chạy, RunHandlers() cần được gọi một cách rõ ràng.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
Xem ví dụ sử dụng trong "Bắt đầu". Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler trả về một handler có thể được sử dụng để thêm middleware cấp handler hoặc dừng handlers.
handler := router.AddHandler(
"struct_handler", // tên của handler, phải là duy nhất
"incoming_messages_topic", // chủ đề từ đó các sự kiện được đọc
pubSub,
"outgoing_messages_topic", // chủ đề để xuất bản sự kiện
pubSub,
structHandler{}.Handler,
)
// Middleware cấp handler chỉ được thực thi cho các handler cụ thể
// Middleware này có thể được thêm vào cùng cách như middleware cấp router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Thực thi middleware cụ thể của handler, UUID của message: ", message.UUID)
return h(message)
}
})
// ...
Không có xử lý người xuất bản
Không phải tất cả các xử lý sẽ tạo ra một tin nhắn mới. Bạn có thể sử dụng Router.AddNoPublisherHandler
để thêm loại xử lý này:
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler thêm một xử lý mới.
// Xử lý này không thể trả về các tin nhắn.
// Khi nó trả về một tin nhắn, một lỗi sẽ xảy ra và Nack sẽ được gửi.
//
// handlerName phải là duy nhất và hiện chỉ được sử dụng cho mục đích gỡ lỗi.
//
// subscribeTopic là chủ đề mà xử lý sẽ nhận các tin nhắn.
//
// subscriber được sử dụng để tiêu thụ các tin nhắn.
//
// Nếu bạn thêm một xử lý vào một router đã đang chạy, bạn cần gọi một cách rõ ràng RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Sự nhận biết
Mặc định, khi HanderFunc
không trả về lỗi, msg.Ack()
sẽ được gọi. Nếu có lỗi trả về, msg.Nack()
sẽ được gọi. Vì vậy, sau khi xử lý tin nhắn, bạn không cần gọi msg.Ack()
hoặc msg.Nack
(tất nhiên, bạn có thể nếu bạn muốn).
Tạo các tin nhắn
Khi nhiều tin nhắn được trả về bởi xử lý, hãy lưu ý rằng hầu hết các triển khai Publisher không hỗ trợ xuất bản nguyên tử của các tin nhắn. Nếu như môi trường broker hoặc lưu trữ không khả dụng, chỉ một số tin nhắn có thể được tạo ra và msg.Nack()
sẽ được gửi.
Nếu đây là một vấn đề, hãy xem xét việc mỗi xử lý chỉ xuất bản một tin nhắn.
Chạy Router
Để chạy router, bạn cần gọi Run()
.
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run chạy tất cả các plugin và xử lý, và bắt đầu đăng ký theo các chủ đề được cung cấp.
// Cuộc gọi này chặn trong khi router đang chạy.
//
// Khi tất cả các xử lý dừng lại (ví dụ: vì đăng ký đã được đóng), router cũng sẽ dừng lại.
//
// Để dừng Run(), bạn nên gọi Close() trên router.
//
// ctx sẽ được truyền đạt cho tất cả các subscriber.
//
// Khi tất cả các xử lý dừng lại (ví dụ: vì kết nối đã đóng), Run() cũng sẽ dừng lại.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Đảm bảo Router đang chạy
Hiểu biết về việc xem xét xem router đang chạy có thể hữu ích. Bạn có thể đạt được điều này bằng cách sử dụng phương thức Running()
.
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running đóng lại khi router đang chạy.
// Nói cách khác, bạn có thể chờ cho router đã đang chạy như sau:
// fmt.Println("Starting router")
// go r.Run(ctx)
// // fmt.Println("Router is running")
// Cảnh báo: Vì lý do lịch sử, kênh này không biết về việc đóng router - nó sẽ đóng nếu router tiếp tục chạy và sau đó dừng lại.
func (r *Router) Running() chan struct{} {
// ...
}
Bạn cũng có thể sử dụng hàm IsRunning
trả về một giá trị boolean:
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning trả về true khi router đang chạy.
//
// Cảnh báo: Vì lý do lịch sử, phương pháp này không biết về trạng thái đã đóng của router.
// Nếu bạn muốn biết router đã đóng hay chưa, hãy sử dụng IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
Tắt bộ định tuyến
Để tắt bộ định tuyến, bạn cần gọi Close()
.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close đóng bộ định tuyến một cách ưa thích với một khoảng thời gian chờ được cung cấp trong cấu hình.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
sẽ tắt tất cả các nhà xuất bản và người đăng ký, và đợi cho tất cả các xử lý viên hoàn thành.
Close()
sẽ đợi đến khi hết thời gian đặt trong RouterConfig.CloseTimeout
trong cấu hình. Nếu hết thời gian, Close()
sẽ trả về một lỗi.
Thêm xử lý viên sau khi bắt đầu bộ định tuyến
Bạn có thể thêm một xử lý viên mới khi bộ định tuyến đã chạy. Để làm điều này, bạn cần gọi AddNoPublisherHandler
hoặc AddHandler
, sau đó gọi RunHandlers
.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers chạy tất cả các xử lý viên đã được thêm sau Run().
// RunHandlers là idempotent, do đó có thể gọi nhiều lần một cách an toàn.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Dừng các xử lý viên đang chạy
Bạn có thể dừng chỉ một xử lý viên đang chạy bằng cách gọi Stop()
.
Vui lòng lưu ý rằng bộ định tuyến sẽ tắt khi không còn xử lý viên nào đang chạy.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop dừng xử lý viên.
// Stop là bất đồng bộ.
// Bạn có thể kiểm tra xem xử lý viên đã được dừng với hàm Stopped().
func (h *Handler) Stop() {
// ...
Mô hình thực thi
Các người đăng ký có thể tiêu thụ một thông điệp duy nhất theo thứ tự hoặc nhiều thông điệp song song.
- Luồng Một thông điệp duy nhất là phương pháp đơn giản nhất, có nghĩa là người đăng ký sẽ không nhận được bất kỳ thông điệp nào mới cho đến khi
msg.Ack()
được gọi. - Luồng Nhiều thông điệp được hỗ trợ chỉ bởi một số người đăng ký. Bằng cách đăng ký nhiều phân vùng chủ đề đồng thời, có thể tiêu thụ nhiều thông điệp song song, thậm chí là các thông điệp chưa được xác nhận trước đó (ví dụ: cách làm việc của các người đăng ký Kafka). Bộ định tuyến xử lý mô hình này bằng cách chạy
HandlerFunc
song song.
Vui lòng tham khảo tài liệu Pub/Sub đã chọn để hiểu các mô hình thực hiện được hỗ trợ.
Trung gian
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware cho phép chúng ta viết một cái gì đó tương tự như một trang trí cho HandlerFunc.
// Nó có thể thực hiện một số hoạt động trước (ví dụ: sửa đổi thông điệp tiêu thụ) hoặc sau xử lý viên (sửa đổi thông điệp đã tạo, xác nhận/từ chối thông điệp tiêu thụ, xử lý lỗi, ghi nhật ký, v.v.).
//
// Nó có thể được gắn vào bộ định tuyến bằng cách sử dụng phương pháp `AddMiddleware`.
//
// Ví dụ:
//
// func ExampleMiddleware(h HandlerFunc) HandlerFunc {
// return func(message *Message) ([]*Message, error) {
// fmt.Println("thực thi trước xử lý viên")
// producedMessages, err := h(message)
// fmt.Println("thực thi sau xử lý viên")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Danh sách đầy đủ của trung gian tiêu chuẩn có thể được tìm thấy trong Trung gian.
Các plugin
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin là một hàm được thực thi khi bộ định tuyến bắt đầu.
type RouterPlugin func(*Router) error
// ...
Danh sách đầy đủ của các plugin tiêu chuẩn có thể được tìm thấy trong message/router/plugin.
Ngữ cảnh
Một số giá trị hữu ích được lưu trữ trong context
cho mỗi tin nhắn nhận được bởi người xử lý:
Đoạn mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx trả về tên của người xử lý tin nhắn trong bộ định tuyến đã tiêu thụ tin nhắn từ context.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx trả về tên của loại người xuất bản tin nhắn trong bộ định tuyến từ context.
// Ví dụ, đối với Kafka, nó sẽ là `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx trả về tên của loại người đăng ký tin nhắn trong bộ định tuyến từ context.
// Ví dụ, đối với Kafka, nó sẽ là `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx trả về chủ đề từ đó tin nhắn đã được nhận trong bộ định tuyến từ context.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx trả về chủ đề mà tin nhắn sẽ được xuất bản trong bộ định tuyến từ context.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...