Publisher and Subscriber ওয়াটারমিলের নিচের স্তরের অংশ। ব্যবহারকারীরা প্রায়শই উচ্চস্তরের ইন্টারফেস এবং ফাংশন ব্যবহার করতে চান, যেমন অ্যাসোসিয়েশন, মেট্রিক্স, পয়জন মেসেজ কিউ, পুনরায় চেষ্টা, হার সীমাবদ্ধতা ইত্যাদি।
কিছু সময় প্রক্রিয়া সফল হওয়ার সময় আপেক্ষিক করে আক পাঠাতে চাইতে পারেন। কিছু সময় একটি বার্তা প্রসেস হওয়ার পরে আরেকটি বার্তা প্রেরণ করতে চাইতে পারেন।
এই চাহিদাগুলি পূরণের জন্য, রাউটার নামক একটি উপাদান আছে।
কনফিগারেশন
সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// শান্তি চলাকাল নির্ধারণ করে এই রাউটারটি হ্যান্ডলারদের জন্য কখন বন্ধ করা উচিৎ।
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// রাউটার কনফিগারেশনে কোনও ত্রুটি আছে কিনা তা পরীক্ষা করে।
func (c RouterConfig) Validate() error {
return nil
}
// ...
হ্যান্ডলার
প্রথমে, আপনার আবশ্যক হবে HandlerFunc
ফাংশনটি ইমপ্লিমেন্ট করা:
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc হল এমন একটি ফাংশন যা মেসেজ প্রাপ্ত হওয়ার সময় কল হয়।
//
// যখন HandlerFunc কোন ত্রুটি রিটার্ন না করে, তখন msg.Ack() স্বয়ংক্রিয়ভাবে কল হবে।
//
// যখন HandlerFunc কোন ত্রুটি রিটার্ন করে, msg.Nack() কল হবে।
//
// যখন হ্যান্ডলারে msg.Ack() কল হয় এবং HandlerFunc কোন ত্রুটি রিটার্ন করে,
// তখন ত্রুটি ইতিমধ্যে প্রেরিত হয়েছে because Ack has already been sent।
//
// যখন একাধিক মেসেজ প্রাপ্তি করা হয় (msg.Ack() হওয়ার কারণে অথবা সাবস্ক্রাইবার মৌলিকভাবে একাধিক উপভোক্তা সমর্থন করা),
// HandlerFunc সমদদ্টে অভিবাক্যা পারবে।
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
পরবর্তীতে, আপনার আবশ্যক হবে Router.AddHandler
ব্যবহার করে একটি নতুন হ্যান্ডলার যোগ করার জন্য:
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler একটি নতুন হ্যান্ডলার যোগ করে।
// handlerName অবশ্যই অনন্য হতে হবে। বর্তমানে, এটি মুক্তি পাতানোর জন্য শুধুমাত্র ডিবাগিং এর জন্য ব্যবহৃত।
// subscribeTopic হল টপিক যা হ্যান্ডলার মেসেজ প্রাপ্ত করবে।
// publishTopic হল টপিক যেখানে হ্যান্ডলারের রিটার্নের মেসেজ রাউটার দ্বারা উৎপন্ন হবে।
// যখন হ্যান্ডলারে অনেকগুলি টপিকে পাবলিশ করা প্রয়োজন হয়,
// এর জন্য শুধুমাত্র পাবলিশারকে হ্যান্ডলারে ইনজেক্ট বা মিডলওয়্যার ইমপ্লিমেন্ট করা উচিত,
// যা মেটাডেটা ভিত্তিতে মেসেজ ধরতে এবং নিশ্চিত টপিকে প্রকাশ করতে পারে।
// যদি রাউটার চালু অবস্থাতে একটি হ্যান্ডলার যোগ করা যেতেছে, তবে RunHandlers() স্পষ্টভাবে কল করা আবশ্যক।
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Adding handler", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler একটি নতুন হ্যান্ডলার যোগ করে।
// এই হ্যান্ডলার মেসেজ রিটার্ন করতে পারে না।
// যখন এটি একটি মেসেজ রিটার্ন করে, তখন ত্রুটি ঘটয়ে এবং একটি Nack প্রেরিত হয়।
//
// handlerName অবশ্যই অনন্য হতে হবে। বর্তমানে, এটি মুক্তি পাতানোর জন্য শুধুমাত্র ডিবাগিং এর জন্য ব্যবহৃত।
// subscribeTopic হল টপিক যা হ্যান্ডলার মেসেজ প্রাপ্ত করবে।
// subscriber হল মেসেজ গ্রহণ করতে ব্যবহৃত সাবস্ক্রাইবার।
// যদি রাউটার চালু অবস্থাতে একটি হ্যান্ডলার যোগ করা যেতেছে, তবে RunHandlers() স্পষ্টভাবে কল করা আবশ্যক।
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
"শুরু করার জন্য উদাহরণ ব্যবহার" এ প্রয়োজনীয় উদাহরণে রেফারেন্স দেওয়া আছে। সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler হ্যান্ডলার যোগ করে যা ব্যবহার করা যাবে যেহেতু হ্যান্ডলার লেভেলের মিডলওয়্যার বা হ্যান্ডলার বন্ধ করতে ব্যবহার করা যাবে।
handler := router.AddHandler(
"struct_handler", // ব্যবহারকারী নেম, অবশ্যই অনন্য
"incoming_messages_topic", // যে থেকে ইভেন্টগুলি পড়া হয়
pubSub,
"outgoing_messages_topic", // ইভেন্ট প্রকাশ করার টপিক
pubSub,
structHandler{}.Handler,
)
// হ্যান্ডলার লেভেলের মিডলওয়্যার খালি কেবল নির্দিষ্ট হ্যান্ডলারদের জন্য পালন করা হয়
// এই মিডলওয়্যারটি রাউটার লেভেল মিডলওয়্যার যেভাবে যোগ করা যায়
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("হ্যান্ডলার বিশেষ মিডলওয়্যার পরিষ্কার, মেসেজ UUID: ", message.UUID)
return h(message)
}
})
// ...
কোন প্রকাশক হ্যান্ডলার নেই
প্রতিটি হ্যান্ডলার নতুন একটি বার্তা তৈরি করে যায় না। আপনি Router.AddNoPublisherHandler
ব্যবহার করে এই ধরনের হ্যান্ডলার যুক্ত করতে পারেন:
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler একটি নতুন হ্যান্ডলার যুক্ত করে।
// এই হ্যান্ডলার বার্তা ফিরিয়ে দিতে পারবে না।
// যখন এটি একটি বার্তা ফিরে, তখন একটি ত্রুটি ঘটবে এবং Nack পাঠানো হবে।
//
// handlerName অবশ্যই অনন্য হতে হবে এবং বর্তমানে শুধুমাত্র ডিবাগিং উদ্দেশ্যে ব্যবহৃত।
//
// subscribeTopic হল ঐ বিষয় যেখানে হ্যান্ডলার বার্তা গ্রহণ করবে।
//
// subscriber বার্তা গ্রহণ করার জন্য ব্যবহৃত হয়।
//
// যদি রাউটারে বর্তমানে রান হয়া গ্যাছে, তবে আপনাকে প্রাসঙ্গিকভাবে RunHandlers() কল করতে হবে।
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
প্রশংসার্থে
ডিফল্টভাবে, যখন HandlerFunc
কোনো ত্রুটি ফেরত পাঠায় না, তখন msg.Ack()
কল করা হবে। যদি একটি ত্রুটি ফেরত পাঠায়, তাহলে msg.Nack()
কল করা হবে। তাই, বার্তা নিষ্পত্তি করার পরে, আপনাকে msg.Ack()
বা msg.Nack
কল করতে দরকার নেই (প্রকৃতপক্ষে, আপনি যদি চান তাহলে করতে পারেন)।
বার্তা তৈরি করা
যখন হ্যান্ডলার দ্বারা একাধিক বার্তা ফিরিয়ে দেয়া হয়, তাহলে অনুগ্রহ করে মনে রাখবেন যে অনেক Publisher সংজ্ঞানা নেবার সময় বার্তা সর্বসময় পরম প্রকৃতভাবে না সমর্থন করে। যদি ব্রোকার বা স্টোরেজ অনুপস্থিত হয়, তাহলে কিছু বার্তা মাত্র তৈরি হবে এবং msg.Nack()
পাঠানো হবে।
যদি এটা একটি সমস্যা হয়, তাহলে বিবেচনা করুন যে প্রতিটি হ্যান্ডলার কেবল একটি বার্তা তৈরি করতে হবে।
রাউটার চালানো
রাউটার চালানোর জন্য, আপনাকে Run()
কল করতে হবে।
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run সমস্ত প্লাগইন এবং হ্যান্ডলার চালু করে এবং নির্ধারিত টপিকে সাবস্ক্রাইব করতে শুরু করে।
// এই কল দেওয়া হলে, রাউটার চালু থাকা অবস্থায় ব্লওক করবে।
//
// সমস্ত হ্যান্ডলার বন্ধ (উদাহরণস্বরূপ: ভিড়যাল বন্ধ হওয়া) হলে, রাউটার ওই বন্ধ হবে।
//
// রাউটার বন্ধ করতে, আপনাকে রাউটারে Close() কল করতে হবে।
//
// ctx সমস্ত সাবস্ক্রাইবারগুলোতে প্রসারিত করা হবে।
//
// যখন সমস্ত হ্যান্ডলার বন্ধ (উদাহরণস্বরূপ: বন্ধ সংযোগের জন্য) হবে, রাউটার ওই সংযোগটি বন্ধ করবে।
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
নিশ্চিতকরণ করা যে, রাউটার চালানো হচ্ছে
রাউটার চালানো হয়েছে কিনা বুঝতে অফুল: জন্য ব্যবহার করা যায় Running()
মেথড এর মাধ্যমে।
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running রাউটার চালু হলে বন্ধ করে।
// অন্য শব্দে, আপনি এভাবে আপেক্ষিক করতে পারেন যে রাউটার চালু হয়েছে:
// fmt.Println("Starting router")
// go r.Run(ctx)
// // fmt.Println("Router is running")
// সতর্কতা: ইতিহাসের অবস্থানই কারণে, এই চ্যানেলটি জানে না রাউটারের শাটডাউন সম্পর্কে - এটি যদি রাউটার চালু থাকে এবং তারপর বন্ধ হয়, তাহলে বন্ধ হবে।
func (r *Router) Running() chan struct{} {
// ...
}
আপনি এছাড়া IsRunning
ফাংশন ব্যবহার করতে পারেন যা একটি বুলিয়ান মান ফেরত পাঠায়:
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning রাউটার চালু থাকলে true ফেরত পাঠায়।
//
// সতর্কতা: ইতিহাসের অবস্থানই সমস্যার কারণ, এই পদ্ধতিতে বন্ধ রাউটারের স্থিতি জানে না।
// আপনি যদি জানতে চান যে রাউটার বন্ধ করা হয়েছে কি না, তাহলে ব্যবহার করুন IsClosed।
func (r *Router) IsRunning() bool {
// ...
}
রাউটার বন্ধ করুন
রাউটার বন্ধ করতে, আপনাকে Close()
কল করতে হবে।
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
সমস্ত পাবলিশার এবং সাবস্ক্রাইবারকে বন্ধ করবে এবং সমস্ত হ্যান্ডলার সম্পন্ন হওয়ার জন্য অপেক্ষা করবে।
Close()
কনফিগারেশনে সেট করা টাইমআউটের অপেক্ষা করবে। যদি টাইমআউট অর্জন করা হয়, তবে Close()
একটি ত্রুটি রিটার্ন করবে।
রাউটার চালু করার পরে হ্যান্ডলার যোগ করা
রাউটার চালু থাকার পরে আপনি নতুন হ্যান্ডলার যোগ করতে পারেন। এটা করতে, আপনাকে AddNoPublisherHandler
বা AddHandler
কল করতে হবে, এবং তারপর RunHandlers
কল করতে হবে।
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
চলমান হ্যান্ডলার বন্ধ করা
আপনি Stop()
কল করে শুধুমাত্র একটি চলমান হ্যান্ডলার বন্ধ করতে পারেন।
দয়া করে মনীষ করুন যে, রাউটারে কোনো চলমান হ্যান্ডলার থাকলে রাউটার বন্ধ হবে।
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...
বৈষম্য মডেল
সাবস্ক্রাইবার একটি একক বার্তা সিকোয়েনসিয়ালি বা একাধিক বার্তা সাথে পারালেলভাবে কনসিউম করতে পারে।
-
একক বার্তা প্রবাহ হল সবচেয়ে সহজ পদ্ধতি, যা অর্থ যে সাবস্ক্রাইবাররা
msg.Ack()
কল করা না হওয়া পর্যন্ত কোনো নতুন বার্তা পাবে না। -
একাধিক বার্তা প্রবাহ কেবলমাত্র কিছু সাবস্ক্রাইবার দ্বারা সমর্থিত। একাধিক টপিক পার্টিশনে সাবস্ক্রাইব করে, একাধিক বার্তা পারালেলভাবে কনসিউম করা যায়, হাতযুক্ত ছিলেন না আবার পেয়েছিলেন না (উদাহরণস্বরূপ, কিভাবে কাফকা সাবস্ক্রাইবাররা কাজ করে)। রাউটার এই মডেল প্রসেস করে প্যারালেলে চালানোর সাথেই
HandlerFunc
চালায়।
সমর্থিত বৈষম্য মডেলগুলি বুঝতে, অনুগ্রহ করে নির্বাচিত পাব/সাব ডকুমেন্টেশনে মুখ্যমূল্য দিন।
মিডলওয়্যার
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware আমাদের স্থানীয় জন্য একটি ডেকোরেটরের মতো লিখতে অনুমতি দেয়।
// এটি কিছু অপারেশন বুঝাতে পারে প্রিয় হ্যান্ডলারের পূর্বে (উः কনসিউম মেসেজ সম্পাদনা), বা হ্যান্ডলারের পরে (উঃ জেনারেটেড মেসেজ সম্পাদনा, ঐক্য/নাক কনসিউম মেসেজ, ভুল হ্যান্ডল করা, লগ, ইত্যাদি)
//
// এটি রাউটারে যোগ করার জন্য `AddMiddleware` মেথড দিয়ে সংযুক্ত করা যেতে পারে।
//
// উদাহরণ:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executed before handler")
// producedMessages, err := h(message)
// fmt.Println("executed after handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
মানক মিডলওয়্যারের সম্পূর্ণ তালিকা Middlewares থেকে পাওয়া যায়।
প্লাগইন
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin হল একটি ফাংশন যা রাউটার চালু করা হলে সম্পাদিত হবে।
type RouterPlugin func(*Router) error
// ...
স্ট্যান্ডার্ড প্লাগইনের সম্পূর্ণ তালিকা message/router/plugin থেকে পাওয়া যায়।
সংদর্ভ
হ্যান্ডলার দ্বারা প্রাপ্ত প্রতিটি বার্তার জন্য সংদর্ভে
কিছু গুরুত্বপূর্ণ মান সংরক্ষিত আছে:
সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx ফাংশনটি সংদর্ভ থেকে বার্তার হ্যান্ডলারের নাম প্রস্তুত করে যেটি বার্তা আসলো সেখান থেকে।
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx ফাংশনটি সংদর্ভ থেকে রাউটারে বার্তা প্রকাশের ধরণের নাম প্রাপ্ত করে।
// উদাহরণস্বরুপ, কাফকা বিষয়ে, এটি `kafka.Publisher` হবে।
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx ফাংশনটি সংদর্ভ থেকে রাউটারে বার্তা গ্রহণের ধরণের নাম প্রাপ্ত করে।
// উদাহরণস্বরুপ, কাফকা বিষয়ে, এটি `kafka.Subscriber` হবে।
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx ফাংশনটি সংদর্ভ থেকে বার্তা গ্রহণ করা টপিক প্রাপ্ত করে।
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx ফাংশনটি সংদর্ভ থেকে বার্তা প্রকাশ করার টপিক প্রাপ্ত করে।
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...