Publisher اور Subscriber واٹرمل کے نچلے سطح کے حصے ہیں۔ عملی استعمال میں، عام طور پر آپ کو بلند سطح کے انٹرفیس اور فعل استعمال کرنا ہوتا ہے، جیسے کہ منسلک، میٹرکس، زہریلا پیغام کی قطاریں، دوبارہ کوشش، شرح محدودیت وغیرہ۔
کبھی کبھار، آپ کو پروسیسنگ کامیاب ہونے پر ایک ایک کائنات بھیجنا نہیں چاہیے ہوتا۔ کبھی کبھار، آپ کو ایک پیغام دوسرے پیغام کے پروسیس ہونے کے بعد بھیجنا ہوتا ہے۔
ان ضروریات کو پورا کرنے کے لیے، ایک عنصر موجود ہوتا ہے جس کو راوٹر کہتے ہیں۔
تشکیل
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout کو منظور کرتا ہے کے راستے پر کٹواو کرتے وقت راوٹر کتنی دیر تک ہینڈلرز کام کرے۔
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate راوٹر کی تشکیل میں کوئی خامی ہونے کا تائید کرتا ہے۔
func (c RouterConfig) Validate() error {
return nil
}
// ...
ہینڈلر
سب سے پہلے، آپ کو HandlerFunc
فعلانا ہوگا:
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc واٹرمل میسج حاصل ہونے پر بلایا جانے والا فنکشن ہے۔
//
// جب HandlerFunc ایک خطا واپس نہیں کرتا ہے، تو msg.Ack() خود بخود بلایا جائے گا۔
//
// جب HandlerFunc ایک خطا واپس کرتا ہے، تو msg.Nack() بلایا جائے گا۔
//
// جب handler میں msg.Ack() بلایا جاتا ہے اور HandlerFunc ایک خطا واپس کرتا ہے،
// تو msg.Nack() ارسال نہیں کیا جائے گا کیونکہ پہلے ہی Ack بھیج دیا گیا ہو گا۔
//
// جب متعدد پیغامات (msg.Ack() کے بھیجے جانے کی وجہ سے یا Subscriber کی متعدد صارفین کی حمایت ہونے کی بنا پر) حاصل ہوتے ہیں،
// تو HandlerFunc بے وقتی کے ساتھ ارتعاش سے اجراء کیا جائے گا۔
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
اگلے، آپ کو Router.AddHandler
استعمال کرکے نیا ہینڈلر شامل کرنا ہوگا:
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler نیا ہینڈلر شامل کرتا ہے۔
// handlerName کو یکتا ہونا چاہئے۔ فی الحال، صرف ڈیبگنگ کے لئے استعمال ہوتا ہے۔
// subscribeTopic وہ موضوع ہے جس پر ہینڈلر کو پیغامات ملیں گی۔
// publishTopic وہ موضوع ہے جس پر ہینڈلر کی واپسی پیغامات کو راول جی آر کے ذریعے پیدا کیا جائے گا۔
// جب ہینڈلر کو متعدد موضوعات پر راول کرنا ہو،
// تو صرف پبلشر کو ہینڈلر میں شامل کرنا یا میدل ویئر منظور کرتا ہوں،
// جو میٹا ڈیٹا کے بنیاد پر پیغامات کو اندراج کر سکتا ہے اور خاص موضوعات پر راول کرنا چاہئے۔
// اگر ہینڈلر اضافہ کیا جاتا ہے جب راولر پہلے ہی چل رہا ہے، تو RunHandlers() کو واضح طور پر بلایا جانا چاہئے۔
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Adding handler", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler نیا ہینڈلر شامل کرتا ہے۔
// یہ ہینڈلر پیغامات کو واپس نہیں کر سکتا۔
// جب یہ ایک پیغام واپس کرتا ہے، ایک خطا واقع ہوتی ہے اور ایک نیک بھیج دیا جاتا ہے۔
//
// handlerName کو یکتا ہونا چاہئے۔ فی الحال، صرف ڈیبگنگ کے لئے استعمال ہوتا ہے۔
// subscribeTopic وہ موضوع ہے جس پر ہینڈلر کو پیغامات ملیں گی۔
// subscriber ایک صارف ہے جو پیغامات کا استعمال کرنے کے لئے استعمال ہوتا ہے۔
// اگر ہینڈلر اضافہ کیا جاتا ہے جب راولر پہلے ہی چل رہا ہے، تو RunHandlers() کو واضح طور پر بلایا جانا چاہئے۔
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
"شروع کرنے" میں مثال کا استعمال دیکھنے کے لئے ریفرنس کریں۔ مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler ایک ہینڈلر واپس کرتا ہے جو ہینڈلر سطح کی مڈل ویئر یا ہینڈلرز کو روکنے کے لئے استعمال کیا جاسکتا ہے۔
handler := router.AddHandler(
"struct_handler", // ہینڈلر نام ، ضروری ہے کہ یہ یکتا ہو
"incoming_messages_topic", // موضوع جہاں واقعات پڑھے جاتے ہیں
pubSub,
"outgoing_messages_topic", // واقعات شائع کرنے کے لئے موضوع
pubSub,
structHandler{}.Handler,
)
// ہینڈلر سطح کی مڈل ویئر صرف مخصوص ہینڈلر کے لئے نافذ ہوتی ہے
// یہ مڈل ویئر اسی طرح سے جوان میڈل ویئر کی طرح شامل کی جا سکتی ہے۔
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("ہینڈلر کا خصوصی مڈل ویئر نافذ ہو رہا ہے ، پیغام یو ایو ڈی: ", message.UUID)
return h(message)
}
})
// ...
کوئی پبلشر ہینڈلر
ہر ہینڈلر نیا پیغام تیار نہیں کرے گا۔ آپ Router.AddNoPublisherHandler
استعمال کرکے اس قسم کے ہینڈلر شامل کرسکتے ہیں:
github.com/ThreeDotsLabs/watermill/message/router.go کامل سورس کوڈ:
// ...
// AddNoPublisherHandler نیا ہینڈلر شامل کرتا ہے۔
// یہ ہینڈلر پیغامات کو واپس نہیں کرسکتا۔
// جب یہ ایک پیغام کو واپس کرے تو ایک خرابی پیدا ہوگی اور Nack بھیجا جائے گا۔
//
// handlerName کو مضمون ہونا چاہئے اور فی الحال صرف تشخیصی مقاصد کے لئے استعمال ہوتا ہے۔
//
// subscribeTopic وہ موضوع ہے جس پر ہینڈلر پیغامات کو وصول کرے گا۔
//
// subscriber پیغامات کو استہل کرنے کے لئے استعمال ہوتا ہے۔
//
// اگر آپ ایک ہینڈلر کو ایک راؤٹر میں شامل کریں جو پہلے ہی چل رہا ہو، تو آپ کو واضح طور پر RunHandlers() کو کال کرنا ہوگا۔
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
تصدیق
بطور پہلے، جب HanderFunc
خرابی واپس نہیں کرتا ہے، تو msg.Ack()
بلایا جائے گا۔ اگر کوئی خرابی واپس کی جائے تو، msg.Nack()
کو بلایا جائے گا۔ لہذا پیغام کو ہینڈل کرنے کے بعد، آپ کو msg.Ack()
یا msg.Nack
نہیں بلانا ہوگا (البتہ، آپ چاہیں تو کرسکتے ہیں)۔
پیغامات کا تیاری کرنا
جب ہینڈلر دوبارہ نیا پیغام بھیجتا ہے، تو یاد رکھیں کہ بہت سارے پبلشر انفرادی پیغامات کو ایک ساتھ ڈالنے کی حمایت نہیں کرتے۔ اگر بروکر یا اسٹوریج دستیاب نہ ہوں تو صرف کچھ پیغامات تیار ہوں گے اور msg.Nack()
بھیجا جائے گا۔
اگر یہ ایک مسئلہ ہے تو، تو نظرثانی کریں کہ ہر ہینڈلر صرف ایک پیغام بھیجے۔
راؤٹر چلانا
راؤٹر کو چلانے کے لئے آپ کو Run()
کو کال کرنا ہوگا۔
github.com/ThreeDotsLabs/watermill/message/router.go کامل سورس کوڈ:
// ...
// Run تمام پلگ امنیٹس اور ہینڈلرز کو چلاتا ہے، اور دی گئی موضوعات کے ساتھ سبسکرائب کرتا ہے۔
// یہ کال راؤٹر چلتے وقت بلاک ہوگی۔
//
// جب سارے ہینڈلر بند ہوجائیں گے (مثال کے طور پر کونکے سبسکرپشن بند ہوگیا ہو)، تو راؤٹر بھی رک جائے گا۔
//
// Run() کو روکنے کے لئے، آپ کو راؤٹر پر Close() کو کال کرنا ہوگا۔
//
// ctx کو سب مشترکوں تک منتقل کیا جائے گا۔
//
// جب سارے ہینڈلر بند ہوجائیں گے (مثلا: بند کنکشنز کے باعث)، تو Run() بھی روک جائے گی۔
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
یقینی بنانا کہ راؤٹر چل رہا ہے
راؤٹر کا چل رہا ہے یہ معلوم کرنا فائدہ مند ہوسکتا ہے۔ آپ Running()
کے ذریعے اس کا استعمال کرکے یہ حاصل کرسکتے ہیں۔
github.com/ThreeDotsLabs/watermill/message/router.go کامل سورس کوڈ:
// ...
// Running راؤٹر چل رہا ہونے پر بند ہوتا ہے۔
// یعنی، آپ راؤٹر کے چلتے ہوئے ہونے کا انتظار کرسکتے ہیں:
// fmt.Println("راؤٹر کا آغاز")
// go r.Run(ctx)
// // fmt.Println("راؤٹر چل رہا ہے")
// تنبیہ: تاریخی وجوہات کی بنا پر، یہ چینل راؤٹر کی بند ہونے کا علم نہیں رکھتا - یہ اس صورت میں بند ہوگا جب راؤٹر چلتا رہے اور بعد میں بند ہوجاءے۔
func (r *Router) Running() chan struct{} {
// ...
}
آپ IsRunning
کا استعمال بھی کرسکتے ہیں، جو ایک بولیئن قیمت واپس کرتا ہے:
github.com/ThreeDotsLabs/watermill/message/router.go کامل سورس کوڈ:
// ...
// IsRunning راؤٹر چل رہا ہونے پر ٹریو ہوگا۔
//
// تنبیہ: تاریخی وجوہات کی بنا پر، یہ میتھڈ راؤٹر کی بندی کی حالت کا علم نہیں رکھتی ہے۔
// اگر آپ یہ جاننا چاہتے ہیں کہ راؤٹر بند ہوگیا ہے، تو IsClosed کا استعمال کریں۔
func (r *Router) IsRunning() bool {
// ...
}
راوٹر بند کریں
راوٹر بند کرنے کے لئے، آپ کو Close()
کو کال کرنا ہوگا۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
تمام پبلشرز اور سبسکرائبرز کو بند کرے گا، اور تمام ہینڈلرز کو مکمل ہونے کا انتظار کرے گا۔
Close()
تنظیم میں فراہم کردہ ٹائم آؤٹ کے ساتھ وقفہ کے لئے انتظار کرے گا۔ اگر ٹائم آؤٹ ہوجائے تو، Close()
ایک خرابی واپس کرے گا۔
راوٹر شروع ہونے کے بعد ہینڈلرز شامل کرنا
آپ راوٹر شروع ہونے کے بعد نیا ہینڈلر شامل کرسکتے ہیں۔ اس کے لئے، آپ کو AddNoPublisherHandler
یا AddHandler
کو کال کرکے، پھر RunHandlers
کو کال کرنا ہوگا۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
چل رہے ہینڈلرز کی روکدو
آپ Stop()
کو کال کرکے صرف ایک چل رہا ہینڈلر روک سکتے ہیں۔
براہ کرم نوٹ کریں کہ راوٹر جب بھی کوئی چل رہے ہینڈلر نہ ہو تو بند ہوجائے گا۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...
نفاذی ماڈل
سبسکرائبرز ایک میسج کو متواتر ترتیب سے یا متواتر میسجز کو ایک ساتھ کھینچ سکتے ہیں۔
-
انفرادی میسج فلو سب سادہ ترین طریقہ ہے، جس کا مطلب ہے کہ سبسکرائبرز کو نئے میسجس نہیں ملیں گے جب تک
msg.Ack()
کال نہیں کیا جاتا۔ -
متعدد میسج فلو کے صرف مخصوص سبسکرائبرز کو حمایت دی جاتی ہے۔ مختلف ٹاپک پارشنز کو ایک ساتھ سبسکرائب کر کے، متعدد میسجز متواتر میں کھینچے جا سکتے ہیں، حتی کہ وہ میسجز جو پہلے سے تسلیم نہیں کئے گئے تھے (مثلاً، کیسے کافکا سبسکرائبرز کام کرتے ہیں)۔ راوٹر یہ ماڈل عمل میں لانے کے لئے
HandlerFunc
کو متوازی میں چلاتا ہے۔
براہ کرم منتخب شدہ Pub/Sub دستاویزات پر رجوع کریں تاکہ حمایت دیے گئے نفاذی ماڈلز کو سمجھ سکیں۔
مڈیول وئر
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware allows us to write something similar to a decorator for HandlerFunc.
// It can execute some operations before (e.g., modify the consumed message) or after the handler (modify the generated message, ack/nack the consumed message, handle errors, log, etc.).
//
// It can be attached to the router using the `AddMiddleware` method.
//
// Example:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executed before handler")
// producedMessages, err := h(message)
// fmt.Println("executed after handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
معیاری مڈیول کی مکمل فہرست Middlewares میں دستیاب ہو سکتی ہے۔
پلگ ان
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin is a function executed when the router starts.
type RouterPlugin func(*Router) error
// ...
معیاری پلگ ان کی مکمل فہرست message/router/plugin میں دستیاب ہو سکتی ہے۔
متن
ہینڈلر کے دوران ہر پیغام جو ہینڈلر کو موصول ہوتا ہے، میں context
میں کچھ مفید قیمتیں محفوظ ہوتی ہیں۔
مکمل ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx میسج ہینڈلر کا نام واپس کرتا ہے جو کہ میسج ہینڈلر نے کنسیوم کیا گیا ہے
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx میسج پبلشر کی قسم کا نام واپس کرتا ہے جو کہ میسج راوٹر سے کنٹیکسٹ میں ہے
// مثلاً، کافکے کے لئے، یہ `kafka.Publisher` ہوتا ہے۔
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx میسج سبسکرائبر کی قسم کا نام واپس کرتا ہے جو کہ میسج راوٹر سے کنٹیکسٹ میں ہے
// مثلاً، کافکے کے لئے، یہ `kafka.Subscriber` ہوتا ہے۔
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx کنٹیکسٹ سے واپس تازہ کنا ٹاپک والا تازہ ہوا تازہ جو کہ میسج سبسکرائب ہوا ہے۔
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx کنٹیکسٹ سے واپس تازہ تازہ ٹاپک والا تازہ کرتا ہے جو کہ میسج پبلش کرنا چاہتا ہے۔
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...