Publisher و Subscriber بخشهای پایینی Watermill هستند. در برنامههای عملیاتی، معمولاً میخواهید از رابطها و توابع سطح بالا مانند ارتباطات، متریکها، صفهای پیام سم زده، تلاش مجدد، محدودیت نرخ و غیره استفاده کنید.
گاهی اوقات، ممکن است بخواهید هنگام پردازش موفق بدون Ack ارسال نکنید. گاهی اوقات، ممکن است بخواهید پیامی را پس از پردازش پیام دیگری ارسال کنید.
برای برآورده کردن این نیازها، یک مولفه به نام Router وجود دارد.
تنظیمات
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout تعیین میکند که روتر باید چه مدت برای دستگیرهها کار کند هنگام بسته شدن.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate بررسی میکند که آیا در پیکربندی روتر هر گونه خطا وجود دارد یا خیر.
func (c RouterConfig) Validate() error {
return nil
}
// ...
دستگیره
اولین کاری که باید انجام دهید، پیادهسازی تابع HandlerFunc
میباشد:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc تابعی است که هنگام دریافت یک پیام فراخوانی میشود.
//
// زمانی که تابع HandlerFunc خطا ندارد، خودکار msg.Ack() فراخوانی خواهد شد.
//
// زمانی که تابع HandlerFunc یک خطا برمیگرداند، msg.Nack() فراخوانی خواهد شد.
//
// زمانی که msg.Ack() در داخل دستگیره فراخوانی شود و تابع HandlerFunc یک خطا برمیگرداند،
// msg.Nack() ارسال نخواهد شد زیرا Ack از پیش ارسال شده است.
//
// زمانی که چندین پیام دریافت میشود (به دلیل ارسال msg.Ack() در HandlerFunc یا Subscriber که از چندین مصرفکننده پشتیبانی میکند)،
// HandlerFunc به طور همزمان اجرا خواهد شد.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
بعداً باید از Router.AddHandler
استفاده کرده و یک دستگیره جدید اضافه کنید:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler یک دستگیره جدید اضافه میکند.
//
// handlerName باید یکتا باشد. در حال حاضر فقط برای اشکالزدایی استفاده میشود.
//
// subscribeTopic، موضوعی است که دستگیره پیامها را از آن دریافت میکند.
//
// publishTopic، موضوعی است که پیامهای بازگشتی دستگیره توسط Router تولید میشوند.
//
// هنگامی که دستگیره نیاز به انتشار در چندین موضوع دارد،
// توصیه میشود فقط Publisher را به دستگیره وارد کنید یا middleware پیادهسازی کنید،
// که میتواند پیامها را بر اساس فهرستی از متادیتاها گرفته و آنها را در موضوعهای خاص منتشر کند.
//
// اگر یک دستگیره در حال اجرا است در هنگام اضافه کردن دستگیره، باید به صورت صریح RunHandlers() صدا زده شود.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Adding handler", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped تا همیشه منتظر handlerAdded نیست
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler یک دستگیره جدید اضافه میکند.
// این دستگیره نمیتواند پیامها را بازگشتی کند.
// زمانی که یک پیام بازگشتی میدهد، یک خطا رخ میدهد و یک Nack ارسال میشود.
//
// handlerName باید یکتا باشد. در حال حاضر فقط برای اشکالزدایی استفاده میشود.
// subscribeTopic، موضوعی است که دستگیره پیامها را از آن دریافت میکند.
// subscriber، یک مشترک استفاده شده برای مصرف پیامها است.
// اگر یک دستگیره در حال اجرا است در هنگام اضافه کردن دستگیره، باید به صورت صریح RunHandlers() صدا زده شود.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
برای مثال استفاده شده در "شروع کار" ارجاع دهید. کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler یک handler برمیگرداند که میتوان از آن برای افزودن middleware سطح handler استفاده کرد یا handlerها را متوقف کرد.
handler := router.AddHandler(
"struct_handler", // نام handler که باید یکتا باشد
"incoming_messages_topic", // topic از جایگاهی که رویدادها خوانده میشوند
pubSub,
"outgoing_messages_topic", // topic برای انتشار رویدادها
pubSub,
structHandler{}.Handler,
)
// middleware سطح handler فقط برای handlerهای خاص اجرا میشود
// این middleware میتواند به همان روش middleware سطح router اضافه شود
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("اجرای middleware خاص handler، UUID پیام: ", message.UUID)
return h(message)
}
})
// ...
بدون دستکاری گر
هر دستکاری گری پیام جدیدی ایجاد نخواهد کرد. میتوانید از Router.AddNoPublisherHandler
برای افزودن این نوع دستکاری گر استفاده کنید:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler یک دستکاری گر جدید اضافه میکند.
// این دستکاری گر نمیتواند پیامها را برگرداند.
// وقتی یک پیام برگردانده شود، یک خطا رخ میدهد و Nack ارسال میشود.
//
// handlerName باید یکتا باشد و در حال حاضر تنها برای اهداف اشکال زدایی استفاده میشود.
//
// subscribeTopic موضوعی است که دستکاری گر دریافت پیامها را دریافت میکند.
//
// subscriber برای مصرف پیامها استفاده میشود.
//
// اگر یک دستکاری گر را به یک مسیری که در حال اجرا است اضافه کنید، باید به طور صریح RunHandlers() را فراخوانی کنید.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
تشکر
به طور پیشفرض، وقتی HanderFunc
خطا برنگرداند، msg.Ack()
فراخوانی خواهد شد. اگر یک خطا برگردانده شود، msg.Nack()
فراخوانی خواهد شد. بنابراین، پس از بررسی پیام، شما نیاز به فراخوانی msg.Ack()
یا msg.Nack
ندارید (البته، اگر بخواهید میتوانید).
ایجاد پیامها
در صورت بازگرداندن چندین پیام توسط دستکاری گر، لطفاً توجه داشته باشید که اکثر پیادهسازیهای Publisher از انتشار اتمی پیامها پشتیبانی نمیکنند. اگر بروکر یا فضای ذخیرهسازی در دسترس نباشد، فقط برخی از پیامها ایجاد خواهند شد و msg.Nack()
ارسال خواهد شد.
اگر این یک مشکل است، در نظر داشته باشید که هر دستکاری گر فقط یک پیام را انتشار دهد.
اجرا کردن Router
برای اجرای روتر، باید Run()
را فراخوانی کنید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run تمام افزونهها و دستکاری گرها را اجرا میکند و شروع به مشترک شدن در موضوعات داده شده میکند.
// این فراخوانی در حالی که روتر در حال اجرا است، مسدود میشود.
//
// وقتی تمام دستکاری گرها متوقف میشوند (به عنوان مثال به دلیل بسته شدن اشتراک)، روتر همچنین متوقف خواهد شد.
//
// برای متوقف کردن Run()، شما باید Close() را برای روتر فراخوانی کنید.
//
// ctx به تمام مشترکها منتقل میشود.
//
// وقتی تمام دستکاری گرها متوقف میشوند (مثلاً به دلیل اتصالات بسته شده)، Run() همچنین متوقف خواهد شد.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
اطمینان از اجرای روتر
درک این که آیا روتر در حال اجرا است ممکن است مفید باشد. با استفاده از روش Running()
میتوانید به این هدف دست یابید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running زمانی بسته میشود که روتر در حال اجرا است.
// به عبارت دیگر، شما میتوانید صبر کنید تا روتر اجرا شود، به این صورت:
// fmt.Println("شروع روتر")
// go r.Run(ctx)
// // fmt.Println("روتر در حال اجرا است")
// هشدار: به دلایل تاریخی، این کانال از بستن وضعیت خاتمه دهنده روتر مطلع نمیشود - اگر روتر ادامه دهد و سپس خاموش شود، بسته خواهد شد.
func (r *Router) Running() chan struct{} {
// ...
}
همچنین میتوانید از تابع IsRunning
استفاده کنید که مقدار بولین را برمیگرداند:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning زمانی true بر میگرداند که روتر در حال اجرا است.
//
// هشدار: به دلایل تاریخی، این متد درباره وضعیت بسته شده روتر آگاه نیست.
// اگر میخواهید بدانید آیا روتر بسته شده است، از IsClosed استفاده کنید.
func (r *Router) IsRunning() bool {
// ...
}
خاموش کردن روتر
برای خاموش کردن روتر، باید Close()
را صدا بزنید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
تمام انتشارکنندهها و مشترکها را خاموش میکند و منتظر تکمیل شدن تمام دستگیریها میماند.
Close()
منتظر میماند تا زمان مقرر شده در RouterConfig.CloseTimeout
در تنظیمات گذشته شود. اگر زمان مقرر برسد، Close()
یک خطا برمیگرداند.
افزودن دستگیرهها پس از شروع روتر
شما میتوانید یک دستگیره جدید را زمانی که روتر در حال اجرا است اضافه کنید. برای این کار، باید AddNoPublisherHandler
یا AddHandler
را فراخوانی کرده و سپس RunHandlers
را فراخوانی کنید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
متوقف کردن دستگیرههای در حال اجرا
میتوانید فقط یک دستگیره در حال اجرا را با صدا زدن Stop()
متوقف کنید.
توجه داشته باشید که روتر زمانی خاموش میشود که هیچ دستگیره در حال اجرا وجود نداشته باشد.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...
مدل اجرا
مشترکها میتوانند یک پیام را به ترتیب به صورت متوالی مصرف کنند یا چندین پیام را به صورت موازی.
-
جریان پیام تکی سادهترین روش است که به این معنی است که مشترکها هیچ پیام جدیدی دریافت نخواهند کرد تا زمانی که
msg.Ack()
فراخوانی شود. -
جریان پیامهای چندتایی توسط مشترکان خاصیت دارد. با اشتراک در همزمان چندین پارتیشن موضوع، میتوان چندین پیام را به صورت موازی مصرف کرد، حتی پیامهایی که قبلاً تأیید نشده بودند (برای مثال، چگونگی کار مشترکان Kafka). روتر این مدل را با اجرای
HandlerFunc
به صورت موازی پردازش میکند.
لطفاً به مستندات انتخابی Pub/Sub مراجعه کنید تا مدلهای اجرای پشتیبانیشده را درک کنید.
میانافزار
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware به ما امکان میدهد که چیزی مشابه یک تزئینکننده برای HandlerFunc بنویسیم.
// میتواند بعضی از عملیاتها را قبل (برای مثال، تغییر پیام مصرفی) یا بعد از دستگیره (تغییر پیام تولیدشده، تأیید/رد پیام مصرفی، مدیریت خطاها، ثبت نمودن و ...) اجرا کند.
// میتوان این میانافزار را با استفاده از متد `AddMiddleware` به روتر متصل کرد.
// مثال:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executed before handler")
// producedMessages, err := h(message)
// fmt.Println("executed after handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
لیست کاملی از میانافزارهای استاندارد را در Middlewares
میتوانید پیدا کنید.
پلاگینها
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin یک تابع است که هنگام شروع روتر اجرا میشود.
type RouterPlugin func(*Router) error
// ...
لیست کاملی از پلاگینهای استاندارد را میتوانید در message/router/plugin پیدا کنید.
متن فنی
برخی مقادیر مفید در context
برای هر پیامی که توسط دستگاه پردازش گرفته شده ذخیره میشود:
کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx نام دستگاه پردازش پیام را که پیام از context گرفته شده است برمیگرداند.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx نام نوع انتشار کننده پیام در دستگاه پردازش پیام از context برمیگرداند.
// برای مثال، برای Kafka، `kafka.Publisher` خواهد بود.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx نام نوع مشترک پیام در دستگاه پردازش پیام از context برمیگرداند.
// برای مثال، برای Kafka، `kafka.Subscriber` خواهد بود.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx موضوعی که پیام از آن در دستگاه پردازش پیام گرفته شده است از context برمیگرداند.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx موضوعی که پیام به آن انتشار داده میشود از context در دستگاه پردازش پیام برمیگرداند.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...