الناشر والمشترك هما الأجزاء على مستوى الطبقة السفلى من Watermill. في التطبيقات العملية، غالبًا ما ترغب في استخدام واجهات ووظائف مستوى عالٍ، مثل التجمعات والمقاييس وقوائم الرسائل المسمومة وإعادة المحاولة وتحديد معدل الحد الأقصى وما إلى ذلك.
أحيانًا، قد لا ترغب في إرسال تأكيد عند نجاح المعالجة. وأحيانًا، قد ترغب في إرسال رسالة بعد معالجة رسالة أخرى.
لاستيفاء هذه المتطلبات، يوجد مكون يُسمى موجِّه.
التكوين
كود المصدر الكامل: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout يحدد مقدار الوقت الذي يجب أن يعمل فيه الموجه للمعالجين أثناء الإغلاق.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate تفحص ما إذا كانت هناك أية أخطاء في تكوين الموجه.
func (c RouterConfig) Validate() error {
return nil
}
// ...
المعالج (Handler)
أولاً، تحتاج إلى تنفيذ دالة HandlerFunc
:
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc هي الدالة المستدعاة عند استلام رسالة.
//
// عندما لا تعيد HandlerFunc خطأ، سيتم استدعاء msg.Ack() تلقائيًا.
//
// عندما تعيد HandlerFunc خطأ، سيتم استدعاء msg.Nack().
//
// عند استدعاء msg.Ack() في المعالج وتعيد HandlerFunc خطأ،
// لن يتم إرسال msg.Nack() لأن التأكيد قد أرسل بالفعل.
//
// عند استلام رسائل متعددة (بسبب إرسال msg.Ack() في HandlerFunc أو دعم المشترك لعدة مستهلكين)،
// سيتم تنفيذ HandlerFunc بشكل متزامن.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
ثم، تحتاج إلى استخدام Router.AddHandler
لإضافة معالج جديد:
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler يضيف معالج جديد.
// يجب أن يكون اسم المعالج فريدًا. حالياً، يُستخدم فقط لأغراض التصحيح.
// subscribeTopic هو الموضوع الذي سيتم استلام الرسائل منه بواسطة المعالج.
// publishTopic هو الموضوع الذي ستتم إنشاء الرسائل المُرجعة للمعالج بواسطة الموجه (Router).
// عندما يحتاج المعالج إلى النشر إلى موضوعات متعددة،
// يُوصى بحقن Publisher فقط إلى المعالج أو تنفيذ الوسيطة (middleware)،
// التي يمكنها التقاط الرسائل استنادًا إلى البيانات الوصفية والنشر إلى موضوعات محددة.
// إذا تم إضافة معالج أثناء تشغيل الموجه بالفعل، يجب استدعاء RunHandlers() بشكل صريح.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("إضافة معالج", watermill.LogFields{
"اسم_المعالج": handlerName,
"الموضوع": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped لا ينتظر دائماً handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler يضيف معالج جديد.
// هذا المعالج لا يمكنه إرجاع الرسائل.
// عندما يعيد رسالة، يحدث خطأ ويتم إرسال Nack.
//
// يجب أن يكون اسم المعالج فريدًا. حالياً، يُستخدم فقط لأغراض التصحيح.
// subscribeTopic هو الموضوع الذي سيتم استلام الرسائل منه بواسطة المعالج.
// subscriber هو مشترك يستخدم لاستهلاك الرسائل.
// إذا تم إضافة معالج أثناء تشغيل الموجه بالفعل، يجب استدعاء RunHandlers() بشكل صريح.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
يُرجى الرجوع إلى مثال الاستخدام في "البدء". الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler يقوم بإرجاع معالج يمكن استخدامه لإضافة وسيط مستوى المعالج أو إيقاف المعالجات.
handler := router.AddHandler(
"struct_handler", // اسم المعالج، يجب أن يكون فريدًا
"incoming_messages_topic", // الموضوع الذي يتم منه قراءة الأحداث
pubSub,
"outgoing_messages_topic", // الموضوع لنشر الأحداث
pubSub,
structHandler{}.Handler,
)
// يتم تنفيذ وسيط مستوى المعالج فقط للمعالجات المحددة
// يمكن إضافة هذا الوسيط بنفس الطريقة كما يتم إضافة وسائط مستوى الموجه
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("تنفيذ وسيط معالج محدد، UUID الرسالة: ", message.UUID)
return h(message)
}
})
// ...
عدم وجود معالج الناشر
لن يقوم كل معالج بإنشاء رسالة جديدة. يمكنك استخدام Router.AddNoPublisherHandler
لإضافة هذا النوع من المعالج:
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler يضيف معالجًا جديدًا.
// هذا المعالج لا يمكنه إرجاع الرسائل.
// عندما يقوم بإرجاع رسالة، سيحدث خطأ وسيتم إرسال Nack.
//
// يجب أن يكون اسم المعالج فريدًا ويستخدم حاليًا فقط لأغراض التصحيح.
//
// subscribeTopic هو الموضوع الذي سيتلقى عليه المعالج الرسائل.
//
// يُستخدم subscriber لاستهلاك الرسائل.
//
// إذا قمت بإضافة معالج إلى جهاز توجيه مشغل بالفعل، فيجب عليك استدعاء RunHandlers() بشكل صريح.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
الاعتراف
بشكل افتراضي، عندما لا يعيد HanderFunc
خطأ، سيتم استدعاء msg.Ack()
. إذا تم إرجاع خطأ، سيتم استدعاء msg.Nack()
. لذا، بعد التعامل مع الرسالة، لا تحتاج إلى استدعاء msg.Ack()
أو msg.Nack
(بالطبع، يمكنك فعل ذلك إذا كنت ترغب).
إنتاج الرسائل
عندما يُعاد عدة رسائل بواسطة المعالج، يُرجى ملاحظة أن معظم تنفيذات Publisher لا تدعم النشر الذري للرسائل. إذا كان الوسيط أو التخزين غير متاح، فقد يتم إنشاء رسائل فقط وسيتم إرسال msg.Nack()
.
إذا كانت هذه مشكلة، فكر في جعل كل معالج ينشر رسالة واحدة فقط.
تشغيل الجهاز التوجيه
لتشغيل الجهاز التوجيه، يجب عليك استدعاء Run()
.
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run يُشغل جميع البرامج المكملة والمعالجات، ويبدأ الاشتراك في المواضيع المعطاة.
// يقوم هذا الاستدعاء بمنع تشغيل الجهاز التوجيه.
//
// عندما تتوقف جميع المعالجات (على سبيل المثال لأن الاشتراك تم إغلاقه)، سيتوقف الجهاز التوجيه أيضًا.
//
// لإيقاف Run()، يجب عليك استدعاء Close() على الجهاز التوجيه.
//
// سيتم نقل ctx إلى جميع المشتركين.
//
// عندما تتوقف جميع المعالجات (على سبيل المثال: بسبب اتصالات مغلقة)، سيتوقف Run() أيضًا.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
ضمان تشغيل الجهاز التوجيه
قد يكون من المفيد فهم ما إذا كان الجهاز التوجيه يعمل. يُمكنك تحقيق ذلك باستخدام الطريقة Running()
.
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running يغلق عندما يكون الجهاز التوجيه قيد التشغيل.
// بمعنى آخر، يُمكنك الانتظار حتى يبدأ الجهاز التوجيه بشكل مشابه لهذا:
// fmt.Println("Starting router")
// go r.Run(ctx)
// // fmt.Println("Router is running")
// تحذير: لأسباب تاريخية، هذه القناة لا تعرف عن إغلاق الجهاز التوجيه - ستغلق إذا استمر الجهاز التوجيه في التشغيل ثم أُغلق.
func (r *Router) Running() chan struct{} {
// ...
}
يُمكنك أيضًا استخدام الدالة IsRunning
التي تُرجع قيمة بولية:
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning تُرجع true عندما يكون الجهاز التوجيه قيد التشغيل.
//
// تحذير: لأسباب تاريخية، هذه الطريقة لا تعرف عن حالة إغلاق الجهاز التوجيه.
// إذا كنت تريد معرفة ما إذا تم إغلاق الجهاز التوجيه، استخدم IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
إيقاف تشغيل الموجه
لإيقاف تشغيل الموجه، يجب عليك استدعاء Close()
.
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close تغلق الموجه بشكل سلس بعد انتهاء المهلة الموجودة في التكوين.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
سيوقف جميع الناشرين والمشتركين، وسينتظر حتى اكتمال كافة المعالجين.
Close()
سينتظر حتى ينتهي المهلة المحددة في RouterConfig.CloseTimeout
في التكوين. إذا تم الوصول إلى المهلة، سيُعيد Close()
خطأ.
إضافة معالجين بعد بدء تشغيل الموجه
يمكنك إضافة معالج جديد عندما يكون الموجه قيد التشغيل بالفعل. للقيام بذلك، يجب عليك استدعاء AddNoPublisherHandler
أو AddHandler
، ثم استدعاء RunHandlers
.
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers يشغل كافة المعالجين الذين تمت إضافتهم بعد Run().
// RunHandlers هو مكرر، لذا يمكن استدعاؤه مرارًا بأمان.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
إيقاف المعالجين القائمين بالتشغيل
يمكنك إيقاف معالج قائم بالتشغيل واحد فقط عن طريق استدعاء Stop()
.
يرجى ملاحظة أن الموجه سيتوقف عندما لا يكون هناك معالجين قائمين بالتشغيل.
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop يوقف المعالج.
// Stop هو غير متزامن.
// يمكنك التحقق مما إذا تم إيقاف المعالج بواسطة الدالة Stopped().
func (h *Handler) Stop() {
// ...
نموذج التنفيذ
المشتركون يمكن أن يستهلكوا رسالة واحدة بشكل تسلسلي أو عدة رسائل متوازيا.
-
تدفق الرسالة الفردية هو الأسلوب الأبسط، الذي يعني أن المشتركون لن يتلقوا رسائل جديدة حتى يتم استدعاء
msg.Ack()
. -
تدفق الرسائل المتعددة مدعوم فقط بواسطة بعض المشتركين. من خلال الاشتراك في عدة أقسام للموضوع مباشرة، يمكن استهلاك عدة رسائل متزامنة، حتى الرسائل التي لم يتم التأكيد عليها مسبقًا (على سبيل المثال كيفية عمل المشتركين في كافكا). يقوم الموجه بمعالجة هذا النموذج عن طريق تشغيل
HandlerFunc
متوازيًا.
يرجى الرجوع إلى وثائق Pub/Sub المحددة لفهم النماذج التنفيذية المدعومة.
الوسيطة
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware يسمح لنا بكتابة شيء مشابه للمزين لـ HandlerFunc.
// يمكنه تنفيذ بعض العمليات قبل (على سبيل المثال، تعديل الرسالة المستهلكة) أو بعد المعالج (تعديل الرسالة المولدة، تأكيد/رفض الرسالة المستهلكة، التعامل مع الأخطاء، تسجيل، وما إلى ذلك).
//
// يمكن إرفاقه بالموجه باستخدام طريقة `AddMiddleware`.
//
// مثال:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("تم التنفيذ قبل المعالج")
// producedMessages, err := h(message)
// fmt.Println("تم التنفيذ بعد المعالج")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
يمكن العثور على قائمة كاملة من الوسائط القياسية في Middlewares.
المكونات الإضافية
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin هو دالة يتم تنفيذها عند بدء تشغيل الموجه.
type RouterPlugin func(*Router) error
// ...
يمكن العثور على قائمة كاملة من المكونات الإضافية القياسية في message/router/plugin.
السياق
يتم تخزين بعض القيم المفيدة في context
لكل رسالة يتم استقبالها بواسطة المعالج:
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx يُرجع اسم معالج الرسالة في الموجه الذي استهلك الرسالة من السياق.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx يُرجع اسم نوع ناشر الرسالة في الموجه من السياق.
// على سبيل المثال، بالنسبة لـ Kafka، سيكون `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx يُرجع اسم نوع مشترك الرسالة في الموجه من السياق.
// على سبيل المثال، بالنسبة لـ Kafka، سيكون `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx يُرجع الموضوع الذي تم استقبال الرسالة منه في الموجه من السياق.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx يُرجع الموضوع الذي ستُنشر فيه الرسالة في الموجه من السياق.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...