Publisher و Subscriber بخش‌های پایینی Watermill هستند. در برنامه‌های عملیاتی، معمولاً می‌خواهید از رابط‌ها و توابع سطح بالا مانند ارتباطات، متریک‌ها، صف‌های پیام سم زده، تلاش مجدد، محدودیت نرخ و غیره استفاده کنید.

گاهی اوقات، ممکن است بخواهید هنگام پردازش موفق بدون Ack ارسال نکنید. گاهی اوقات، ممکن است بخواهید پیامی را پس از پردازش پیام دیگری ارسال کنید.

برای برآورده کردن این نیازها، یک مولفه به نام Router وجود دارد.

Watermill Router

تنظیمات

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout تعیین می‌کند که روتر باید چه مدت برای دستگیره‌ها کار کند هنگام بسته شدن.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate بررسی می‌کند که آیا در پیکربندی روتر هر گونه خطا وجود دارد یا خیر.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

دستگیره

اولین کاری که باید انجام دهید، پیاده‌سازی تابع HandlerFunc می‌باشد:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc تابعی است که هنگام دریافت یک پیام فراخوانی می‌شود.
// 
// زمانی که تابع HandlerFunc خطا ندارد، خودکار msg.Ack() فراخوانی خواهد شد.
// 
// زمانی که تابع HandlerFunc یک خطا برمی‌گرداند، msg.Nack() فراخوانی خواهد شد.
// 
// زمانی که msg.Ack() در داخل دستگیره فراخوانی شود و تابع HandlerFunc یک خطا برمی‌گرداند،
// msg.Nack() ارسال نخواهد شد زیرا Ack از پیش ارسال شده است.
// 
// زمانی که چندین پیام دریافت می‌شود (به دلیل ارسال msg.Ack() در HandlerFunc یا Subscriber که از چندین مصرف‌کننده پشتیبانی می‌کند)،
// HandlerFunc به طور همزمان اجرا خواهد شد.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

بعداً باید از Router.AddHandler استفاده کرده و یک دستگیره جدید اضافه کنید:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler یک دستگیره جدید اضافه می‌کند.
// 
// handlerName باید یکتا باشد. در حال حاضر فقط برای اشکال‌زدایی استفاده می‌شود.
// 
// subscribeTopic، موضوعی است که دستگیره پیام‌ها را از آن دریافت می‌کند.
// 
// publishTopic، موضوعی است که پیام‌های بازگشتی دستگیره توسط Router تولید می‌شوند.
// 
// هنگامی که دستگیره نیاز به انتشار در چندین موضوع دارد،
// توصیه می‌شود فقط Publisher را به دستگیره وارد کنید یا middleware پیاده‌سازی کنید،
// که می‌تواند پیام‌ها را بر اساس فهرستی از متادیتاها گرفته و آن‌ها را در موضوع‌های خاص منتشر کند.
// 
// اگر یک دستگیره در حال اجرا است در هنگام اضافه کردن دستگیره، باید به صورت صریح RunHandlers() صدا زده شود.
func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Adding handler", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped تا همیشه منتظر handlerAdded نیست
	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler یک دستگیره جدید اضافه می‌کند.

// این دستگیره نمی‌تواند پیام‌ها را بازگشتی کند.

// زمانی که یک پیام بازگشتی می‌دهد، یک خطا رخ می‌دهد و یک Nack ارسال می‌شود.

//

// handlerName باید یکتا باشد. در حال حاضر فقط برای اشکال‌زدایی استفاده می‌شود.

// subscribeTopic، موضوعی است که دستگیره پیام‌ها را از آن دریافت می‌کند.

// subscriber، یک مشترک استفاده شده برای مصرف پیام‌ها است.

// اگر یک دستگیره در حال اجرا است در هنگام اضافه کردن دستگیره، باید به صورت صریح RunHandlers() صدا زده شود.
func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

برای مثال استفاده شده در "شروع کار" ارجاع دهید. کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler یک handler برمی‌گرداند که می‌توان از آن برای افزودن middleware سطح handler استفاده کرد یا handler‌ها را متوقف کرد.
	handler := router.AddHandler(
		"struct_handler",          // نام handler که باید یکتا باشد
		"incoming_messages_topic", // topic از جایگاهی که رویدادها خوانده می‌شوند
		pubSub,
		"outgoing_messages_topic", // topic برای انتشار رویدادها
		pubSub,
		structHandler{}.Handler,
	)

	// middleware سطح handler فقط برای handler‌های خاص اجرا می‌شود
	// این middleware می‌تواند به همان روش middleware سطح router اضافه شود
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("اجرای middleware خاص handler، UUID پیام: ", message.UUID)

			return h(message)
		}
	})
// ...

بدون دستکاری گر

هر دستکاری گری پیام جدیدی ایجاد نخواهد کرد. می‌توانید از Router.AddNoPublisherHandler برای افزودن این نوع دستکاری گر استفاده کنید:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler یک دستکاری گر جدید اضافه می‌کند.
// این دستکاری گر نمی‌تواند پیام‌ها را برگرداند.
// وقتی یک پیام برگردانده شود، یک خطا رخ می‌دهد و Nack ارسال می‌شود.
//
// handlerName باید یکتا باشد و در حال حاضر تنها برای اهداف اشکال زدایی استفاده می‌شود.
//
// subscribeTopic موضوعی است که دستکاری گر دریافت پیام‌ها را دریافت می‌کند.
//
// subscriber برای مصرف پیام‌ها استفاده می‌شود.
//
// اگر یک دستکاری گر را به یک مسیری که در حال اجرا است اضافه کنید، باید به طور صریح RunHandlers() را فراخوانی کنید.
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

تشکر

به طور پیش‌فرض، وقتی HanderFunc خطا برنگرداند، msg.Ack() فراخوانی خواهد شد. اگر یک خطا برگردانده شود، msg.Nack() فراخوانی خواهد شد. بنابراین، پس از بررسی پیام، شما نیاز به فراخوانی msg.Ack() یا msg.Nack ندارید (البته، اگر بخواهید می‌توانید).

ایجاد پیام‌ها

در صورت بازگرداندن چندین پیام توسط دستکاری گر، لطفاً توجه داشته باشید که اکثر پیاده‌سازی‌های Publisher از انتشار اتمی پیام‌ها پشتیبانی نمی‌کنند. اگر بروکر یا فضای ذخیره‌سازی در دسترس نباشد، فقط برخی از پیام‌ها ایجاد خواهند شد و msg.Nack() ارسال خواهد شد.

اگر این یک مشکل است، در نظر داشته باشید که هر دستکاری گر فقط یک پیام را انتشار دهد.

اجرا کردن Router

برای اجرای روتر، باید Run() را فراخوانی کنید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run تمام افزونه‌ها و دستکاری گرها را اجرا می‌کند و شروع به مشترک شدن در موضوعات داده شده می‌کند.
// این فراخوانی در حالی که روتر در حال اجرا است، مسدود می‌شود.
//
// وقتی تمام دستکاری گرها متوقف می‌شوند (به عنوان مثال به دلیل بسته شدن اشتراک)، روتر همچنین متوقف خواهد شد.
//
// برای متوقف کردن Run()، شما باید Close() را برای روتر فراخوانی کنید.
//
// ctx به تمام مشترک‌ها منتقل می‌شود.
//
// وقتی تمام دستکاری گرها متوقف می‌شوند (مثلاً به دلیل اتصالات بسته شده)، Run() همچنین متوقف خواهد شد.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

اطمینان از اجرای روتر

درک این که آیا روتر در حال اجرا است ممکن است مفید باشد. با استفاده از روش Running() می‌توانید به این هدف دست یابید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running زمانی بسته می‌شود که روتر در حال اجرا است.
// به عبارت دیگر، شما می‌توانید صبر کنید تا روتر اجرا شود، به این صورت:

// 	fmt.Println("شروع روتر")
//	go r.Run(ctx)
//	//	fmt.Println("روتر در حال اجرا است")

// هشدار: به دلایل تاریخی، این کانال از بستن وضعیت خاتمه دهنده روتر مطلع نمی‌شود - اگر روتر ادامه دهد و سپس خاموش شود، بسته خواهد شد.
func (r *Router) Running() chan struct{} {
// ...
}

همچنین می‌توانید از تابع IsRunning استفاده کنید که مقدار بولین را برمی‌گرداند:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning زمانی true بر می‌گرداند که روتر در حال اجرا است.
//
// هشدار: به دلایل تاریخی، این متد درباره وضعیت بسته شده روتر آگاه نیست.
// اگر می‌خواهید بدانید آیا روتر بسته شده است، از IsClosed استفاده کنید.
func (r *Router) IsRunning() bool {
// ...
}

خاموش کردن روتر

برای خاموش کردن روتر، باید Close() را صدا بزنید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() تمام انتشارکننده‌ها و مشترک‌ها را خاموش می‌کند و منتظر تکمیل شدن تمام دستگیری‌ها می‌ماند.

Close() منتظر می‌ماند تا زمان مقرر شده در RouterConfig.CloseTimeout در تنظیمات گذشته شود. اگر زمان مقرر برسد، Close() یک خطا برمی‌گرداند.

افزودن دستگیره‌ها پس از شروع روتر

شما می‌توانید یک دستگیره جدید را زمانی که روتر در حال اجرا است اضافه کنید. برای این کار، باید AddNoPublisherHandler یا AddHandler را فراخوانی کرده و سپس RunHandlers را فراخوانی کنید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

متوقف کردن دستگیره‌های در حال اجرا

می‌توانید فقط یک دستگیره در حال اجرا را با صدا زدن Stop() متوقف کنید.

توجه داشته باشید که روتر زمانی خاموش می‌شود که هیچ دستگیره در حال اجرا وجود نداشته باشد.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...

مدل اجرا

مشترک‌ها می‌توانند یک پیام را به ترتیب به صورت متوالی مصرف کنند یا چندین پیام را به صورت موازی.

  • جریان پیام تکی ساده‌ترین روش است که به این معنی است که مشترک‌ها هیچ پیام جدیدی دریافت نخواهند کرد تا زمانی که msg.Ack() فراخوانی شود.

  • جریان پیام‌های چندتایی توسط مشترکان خاصیت دارد. با اشتراک در همزمان چندین پارتیشن موضوع، می‌توان چندین پیام را به صورت موازی مصرف کرد، حتی پیام‌هایی که قبلاً تأیید نشده بودند (برای مثال، چگونگی کار مشترکان Kafka). روتر این مدل را با اجرای HandlerFunc به صورت موازی پردازش می‌کند.

لطفاً به مستندات انتخابی Pub/Sub مراجعه کنید تا مدل‌های اجرای پشتیبانی‌شده را درک کنید.

میان‌افزار

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware به ما امکان می‌دهد که چیزی مشابه یک تزئین‌کننده برای HandlerFunc بنویسیم. 
// می‌تواند بعضی از عملیات‌ها را قبل (برای مثال، تغییر پیام مصرفی) یا بعد از دستگیره (تغییر پیام تولید‌شده، تأیید/رد پیام مصرفی، مدیریت خطاها، ثبت نمودن و ...) اجرا کند.

// می‌توان این میان‌افزار را با استفاده از متد `AddMiddleware` به روتر متصل کرد.

// مثال:
// 
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("executed before handler")
// 			producedMessages, err := h(message)
// 			fmt.Println("executed after handler")
// 
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

لیست کاملی از میان‌افزارهای استاندارد را در Middlewares می‌توانید پیدا کنید.

پلاگین‌ها

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin یک تابع است که هنگام شروع روتر اجرا می‌شود.
type RouterPlugin func(*Router) error

// ...

لیست کاملی از پلاگین‌های استاندارد را می‌توانید در message/router/plugin پیدا کنید.

متن فنی

برخی مقادیر مفید در context برای هر پیامی که توسط دستگاه پردازش گرفته شده ذخیره می‌شود:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx نام دستگاه پردازش پیام را که پیام از context گرفته شده است برمی‌گرداند.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx نام نوع انتشار کننده پیام در دستگاه پردازش پیام از context برمی‌گرداند.
// برای مثال، برای Kafka، `kafka.Publisher` خواهد بود.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx نام نوع مشترک پیام در دستگاه پردازش پیام از context برمی‌گرداند.
// برای مثال، برای Kafka، `kafka.Subscriber` خواهد بود.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx موضوعی که پیام از آن در دستگاه پردازش پیام گرفته شده است از context برمی‌گرداند.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx موضوعی که پیام به آن انتشار داده می‌شود از context در دستگاه پردازش پیام برمی‌گرداند.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...