التثبيت

go get -u github.com/ThreeDotsLabs/watermill

التحضير

الفكرة الأساسية لتطبيقات الحدث هي دائمًا نفسها: الاستماع لرسائل الواردة والاستجابة لها. يدعم Watermill تنفيذ هذا السلوك للناشرين والمشتركين متعددين.

الجزء الأساسي من Watermill هو Message، الذي يعتبر مهمًا مثل http.Request في حزمة http. تستخدم معظم ميزات Watermill هذه الهيكل بطريقة ما.

على الرغم من الميزات المعقدة التي يوفرها مكتبة PubSub، فإنه لا يتطلب الأمر بالنسبة لـ Watermill أن تنفيذ واجهتين فقط للبدء في استخدامها: Publisher و Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

الاشتراك في الرسائل

لنبدأ بالاشتراك. يتوقع Subscribe اسم موضوع ويقوم بإرجاع قناة لاستقبال الرسائل الواردة. المعنى المحدد لـ الموضوع يعتمد على تنفيذ PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

لمشاهدة أمثلة مفصلة لـ PubSub المدعوم، يُرجى الرجوع إلى المحتوى التالي.

مثال القناة في Go

كود المثال الكامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

كود المثال الكامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

	// نحن بحاجة إلى الاعتراف بأننا استلمنا وعالجنا الرسالة،
	// وإلا سترسل مرات عدة.
	msg.Ack()
	}
}

مثال Kafka

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// ما يعادل auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("تم استقبال الرسالة: %s، الحمولة: %s", msg.UUID, string(msg.Payload))

		// نحتاج إلى التأكيد بأننا قمنا بتلقي ومعالجة الرسالة،
		// وإلا ستعاد إعادة إرسال الرسالة مرارًا وتكرارًا.
		msg.Ack()
	}
}

مثال RabbitMQ (AMQP)

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// تستند هذه الاعدادات على المثال التالي: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// يُستخدم كصف قط
		//
		// إذا كنت ترغب في تنفيذ خدمة بنمط النشر/الاشتراك، يرجى الرجوع إلى
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("تم استقبال الرسالة: %s، الحمولة: %s", msg.UUID, string(msg.Payload))

		// نحتاج إلى التأكيد بأن الرسالة تم استقبالها ومعالجتها،
		// وإلا، ستتم إعادة تسليم الرسالة مرارًا وتكرارًا.
		msg.Ack()
	}
}

مثال SQL

الشفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

الشفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// نحتاج إلى الاعتراف بأن الرسالة تم استلامها ومعالجتها،
		// وإلا ستتم إعادة تسليم الرسالة مرارًا وتكرارًا.
		msg.Ack()
	}
}

إنشاء الرسالة

لا يفرض Watermill أي تنسيق للرسالة. يتوقع NewMessage أن تكون الحمولة مصفوفة بايت. يمكنك استخدام السلاسل النصية، JSON، protobuf، Avro، gob، أو أي تنسيق آخر يمكن تسلسله إلى []byte.

رقم UUID للرسالة اختياري، لكن من المستحسن استخدامه حيث يساعد في عمليات التصحيح.

msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))

نشر الرسالة

يتطلب الطريقة Publish موضوعًا ورسالة واحدة أو أكثر للنشر.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

مثال قناة Go

الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

مثال Kafka

الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

مثال على RabbitMQ (AMQP)

الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("مرحباً، العالم!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

مثال على SQL

الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "مرحباً، العالم!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

استخدام Message Router

الناشرين والمشتركين هم أجزاء منخفضة المستوى في Watermill. في معظم الحالات، سترغب عادة في استخدام واجهات وميزات عالية المستوى مثل الترابط، القياسات، طوابير السموم، إعادة المحاولة، وتقييد السرعة.

قد ترغب في التأكيد على الرسالة بمجرد معالجتها بنجاح. في حالات أخرى، قد ترغب في التأكيد عليها على الفور ومن ثم النظر في معالجتها. في بعض الأحيان، قد ترغب في القيام ببعض الإجراءات استنادًا إلى الرسالة الواردة ونشر رسالة أخرى ردًا عليها.

لتلبية هذه المتطلبات، هناك عنصر يسمى Router.

تطبيق مثال على Message Router

تدفق التطبيق المثالي كما يلي:

  1. إنشاء رسالة على incoming_messages_topic كل ثانية.
  2. يتعامل المستمع struct_handler مع incoming_messages_topic. عند استلام رسالة، يقوم بطباعة UUID وإنشاء رسالة جديدة على outgoing_messages_topic.
  3. يستمع المستمع print_incoming_messages على incoming_messages_topic ويطبع UUID والحمولة والبيانات الوصفية للرسالة.
  4. يستمع المستمع print_outgoing_messages على outgoing_messages_topic ويطبع UUID والحمولة والبيانات الوصفية للرسالة. يجب أن يكون معرف الارتباط نفس الرسالة على incoming_messages_topic.

تكوين الراوتر

أولاً ، قم بتكوين الراوتر عن طريق إضافة الوصلات البرمجية والوسيطة. ثم قم بتعيين المعالجين التي سيستخدمها الراوتر. سيقوم كل معالج بمعالجة الرسائل بشكل مستقل.

الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt
	"log
	"time

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message
	"github.com/ThreeDotsLabs/watermill/message/router/middleware
	"github.com/ThreeDotsLabs/watermill/message/router/plugin
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel
)

var (
	// لهذا المثال ، نستخدم تنفيذ سجل بيانات بسيط ،
	// قد ترغب في توفير تنفيذ خاص به من `watermill.LoggerAdapter`.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// عند تلقي إشارة SIGTERM ، سيقوم SignalsHandler بإغلاق الراوتر بشكلٍ مهيّء.
	// يمكنك أيضاً إغلاق الراوتر عن طريق استدعاء `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// سيتم تنفيذ متوسط البرامج على مستوى الراوتر لكل رسالة يتم إرسالها إلى الراوتر
	router.AddMiddleware(
		// يقوم CorrelationID بنسخ مُعرف الترابط من بيانات الرسالة الواردة إلى الرسالة المولّدة
		middleware.CorrelationID,

		// إذا عاد المعالج بخطأ ، فأعد تشغيل دالة المعالج.
		// بعد الوصول إلى MaxRetries ، يتم نفي الرسالة ، ويتحمل نظام النشر والاشتراك مسؤولية إعادة إرسالها.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// يُعالج Recoverer الانهيارات في المعالج.
		// في هذه الحالة ، يمررها كأخطاء إلى أدوات البرامج الوسيطة.
		middleware.Recoverer,
	)

	// لأغراض بساطة ، نستخدم gochannel Pub/Sub هنا،
	// يمكنك استبداله بأي تنفيذ للنشر والاشتراك وستكون النتيجة نفسها.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// إنشاء بعض الرسائل الواردة في الخلفية
	go publishMessages(pubSub)

	// يعيد AddHandler معالجًا يمكن استخدامه لإضافة وسيطًا على مستوى المعالج أو لإيقاف المعالج.
	handler := router.AddHandler(
		"struct_handler",          // اسم المعالج ، يجب أن يكون فريدًا
		"incoming_messages_topic", // الموضوع الذي يتم منه قراءة الأحداث
		pubSub,
		"outgoing_messages_topic", // الموضوع الذي يتم نشر الأحداث إليه
		pubSub,
		structHandler{}.Handler,
	)

	// سيتم تنفيذ وسيط مستوى المعالج فقط لمعالج محدد
	// يمكن إضافة هذا النوع من الوسيط بنفس الطريقة كوسائط برمجة مستوى الراوتر
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("تنفيذ وسيط محدد للمعالج لـ", message.UUID)

			return h(message)
		}
	})

	// لأغراض تصحيح الأخطاء ، سنطبع كل الرسائل التي تُستلم على "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// لأغراض تصحيح الأخطاء ، سنطبع كل الأحداث التي يتم إرسالها إلى "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// الآن بعد تسجيل جميع المعالجين ، نقوم بتشغيل الراوتر.
	// Run يحجب أثناء تشغيل الراوتر.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

الرسائل الواردة

تستهلك struct_handler الرسائل من incoming_messages_topic، لذا نحاكي حركة الرسائل الواردة من خلال استدعاء publishMessages() في الخلفية. لاحظ أننا أضفنا middleware SetCorrelationID. سيقوم الموجه بإضافة مُعرف الارتباط إلى جميع الرسائل المُنشأة (المُخزنة في البيانات الوصفية).

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("إرسال رسالة %s، مُعرف الارتباط: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

المعالجون

قد تكون قد لاحظت وجود نوعين من دوال المعالج:

  1. دالة func(msg *message.Message) ([]*message.Message, error)
  2. طريقة func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

إذا كان المعالج الخاص بك دالة لا تعتمد على أي تبعيات، فإن استخدام الخيار الأول مقبول. عندما يتطلب المعالج بعض التبعيات (مثل مقابض قاعدة البيانات، والسجلات، إلخ)، فإن الاستخدام الخيار الثاني مفيد.

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> تم استقبال الرسالة: %s\n> %s\n> البيانات الوصفية: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // هنا يمكننا إضافة بعض التبعيات
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("قام structHandler بتلقي الرسالة", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("الرسالة المُنتَجة بواسطة structHandler"))
    return message.Messages{msg}, nil
}

تم الانتهاء!

يمكنك تشغيل هذا المثال باستخدام go run main.go.