التثبيت
go get -u github.com/ThreeDotsLabs/watermill
التحضير
الفكرة الأساسية لتطبيقات الحدث هي دائمًا نفسها: الاستماع لرسائل الواردة والاستجابة لها. يدعم Watermill تنفيذ هذا السلوك للناشرين والمشتركين متعددين.
الجزء الأساسي من Watermill هو Message، الذي يعتبر مهمًا مثل http.Request
في حزمة http
. تستخدم معظم ميزات Watermill هذه الهيكل بطريقة ما.
على الرغم من الميزات المعقدة التي يوفرها مكتبة PubSub، فإنه لا يتطلب الأمر بالنسبة لـ Watermill أن تنفيذ واجهتين فقط للبدء في استخدامها: Publisher
و Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
الاشتراك في الرسائل
لنبدأ بالاشتراك. يتوقع Subscribe
اسم موضوع ويقوم بإرجاع قناة لاستقبال الرسائل الواردة. المعنى المحدد لـ الموضوع يعتمد على تنفيذ PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
لمشاهدة أمثلة مفصلة لـ PubSub المدعوم، يُرجى الرجوع إلى المحتوى التالي.
مثال القناة في Go
كود المثال الكامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
كود المثال الكامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// نحن بحاجة إلى الاعتراف بأننا استلمنا وعالجنا الرسالة،
// وإلا سترسل مرات عدة.
msg.Ack()
}
}
مثال Kafka
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// ما يعادل auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("تم استقبال الرسالة: %s، الحمولة: %s", msg.UUID, string(msg.Payload))
// نحتاج إلى التأكيد بأننا قمنا بتلقي ومعالجة الرسالة،
// وإلا ستعاد إعادة إرسال الرسالة مرارًا وتكرارًا.
msg.Ack()
}
}
مثال RabbitMQ (AMQP)
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// تستند هذه الاعدادات على المثال التالي: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// يُستخدم كصف قط
//
// إذا كنت ترغب في تنفيذ خدمة بنمط النشر/الاشتراك، يرجى الرجوع إلى
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("تم استقبال الرسالة: %s، الحمولة: %s", msg.UUID, string(msg.Payload))
// نحتاج إلى التأكيد بأن الرسالة تم استقبالها ومعالجتها،
// وإلا، ستتم إعادة تسليم الرسالة مرارًا وتكرارًا.
msg.Ack()
}
}
مثال SQL
الشفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
الشفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// نحتاج إلى الاعتراف بأن الرسالة تم استلامها ومعالجتها،
// وإلا ستتم إعادة تسليم الرسالة مرارًا وتكرارًا.
msg.Ack()
}
}
إنشاء الرسالة
لا يفرض Watermill أي تنسيق للرسالة. يتوقع NewMessage
أن تكون الحمولة مصفوفة بايت. يمكنك استخدام السلاسل النصية، JSON، protobuf، Avro، gob، أو أي تنسيق آخر يمكن تسلسله إلى []byte
.
رقم UUID للرسالة اختياري، لكن من المستحسن استخدامه حيث يساعد في عمليات التصحيح.
msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))
نشر الرسالة
يتطلب الطريقة Publish
موضوعًا ورسالة واحدة أو أكثر للنشر.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
مثال قناة Go
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
مثال Kafka
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
مثال على RabbitMQ (AMQP)
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("مرحباً، العالم!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
مثال على SQL
الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "مرحباً، العالم!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
استخدام Message Router
الناشرين والمشتركين هم أجزاء منخفضة المستوى في Watermill. في معظم الحالات، سترغب عادة في استخدام واجهات وميزات عالية المستوى مثل الترابط، القياسات، طوابير السموم، إعادة المحاولة، وتقييد السرعة.
قد ترغب في التأكيد على الرسالة بمجرد معالجتها بنجاح. في حالات أخرى، قد ترغب في التأكيد عليها على الفور ومن ثم النظر في معالجتها. في بعض الأحيان، قد ترغب في القيام ببعض الإجراءات استنادًا إلى الرسالة الواردة ونشر رسالة أخرى ردًا عليها.
لتلبية هذه المتطلبات، هناك عنصر يسمى Router.
تطبيق مثال على Message Router
تدفق التطبيق المثالي كما يلي:
- إنشاء رسالة على
incoming_messages_topic
كل ثانية. - يتعامل المستمع
struct_handler
معincoming_messages_topic
. عند استلام رسالة، يقوم بطباعة UUID وإنشاء رسالة جديدة علىoutgoing_messages_topic
. - يستمع المستمع
print_incoming_messages
علىincoming_messages_topic
ويطبع UUID والحمولة والبيانات الوصفية للرسالة. - يستمع المستمع
print_outgoing_messages
علىoutgoing_messages_topic
ويطبع UUID والحمولة والبيانات الوصفية للرسالة. يجب أن يكون معرف الارتباط نفس الرسالة علىincoming_messages_topic
.
تكوين الراوتر
أولاً ، قم بتكوين الراوتر عن طريق إضافة الوصلات البرمجية والوسيطة. ثم قم بتعيين المعالجين التي سيستخدمها الراوتر. سيقوم كل معالج بمعالجة الرسائل بشكل مستقل.
الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt
"log
"time
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message
"github.com/ThreeDotsLabs/watermill/message/router/middleware
"github.com/ThreeDotsLabs/watermill/message/router/plugin
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel
)
var (
// لهذا المثال ، نستخدم تنفيذ سجل بيانات بسيط ،
// قد ترغب في توفير تنفيذ خاص به من `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// عند تلقي إشارة SIGTERM ، سيقوم SignalsHandler بإغلاق الراوتر بشكلٍ مهيّء.
// يمكنك أيضاً إغلاق الراوتر عن طريق استدعاء `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// سيتم تنفيذ متوسط البرامج على مستوى الراوتر لكل رسالة يتم إرسالها إلى الراوتر
router.AddMiddleware(
// يقوم CorrelationID بنسخ مُعرف الترابط من بيانات الرسالة الواردة إلى الرسالة المولّدة
middleware.CorrelationID,
// إذا عاد المعالج بخطأ ، فأعد تشغيل دالة المعالج.
// بعد الوصول إلى MaxRetries ، يتم نفي الرسالة ، ويتحمل نظام النشر والاشتراك مسؤولية إعادة إرسالها.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// يُعالج Recoverer الانهيارات في المعالج.
// في هذه الحالة ، يمررها كأخطاء إلى أدوات البرامج الوسيطة.
middleware.Recoverer,
)
// لأغراض بساطة ، نستخدم gochannel Pub/Sub هنا،
// يمكنك استبداله بأي تنفيذ للنشر والاشتراك وستكون النتيجة نفسها.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// إنشاء بعض الرسائل الواردة في الخلفية
go publishMessages(pubSub)
// يعيد AddHandler معالجًا يمكن استخدامه لإضافة وسيطًا على مستوى المعالج أو لإيقاف المعالج.
handler := router.AddHandler(
"struct_handler", // اسم المعالج ، يجب أن يكون فريدًا
"incoming_messages_topic", // الموضوع الذي يتم منه قراءة الأحداث
pubSub,
"outgoing_messages_topic", // الموضوع الذي يتم نشر الأحداث إليه
pubSub,
structHandler{}.Handler,
)
// سيتم تنفيذ وسيط مستوى المعالج فقط لمعالج محدد
// يمكن إضافة هذا النوع من الوسيط بنفس الطريقة كوسائط برمجة مستوى الراوتر
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("تنفيذ وسيط محدد للمعالج لـ", message.UUID)
return h(message)
}
})
// لأغراض تصحيح الأخطاء ، سنطبع كل الرسائل التي تُستلم على "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// لأغراض تصحيح الأخطاء ، سنطبع كل الأحداث التي يتم إرسالها إلى "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// الآن بعد تسجيل جميع المعالجين ، نقوم بتشغيل الراوتر.
// Run يحجب أثناء تشغيل الراوتر.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
الرسائل الواردة
تستهلك struct_handler
الرسائل من incoming_messages_topic
، لذا نحاكي حركة الرسائل الواردة من خلال استدعاء publishMessages()
في الخلفية. لاحظ أننا أضفنا middleware SetCorrelationID
. سيقوم الموجه بإضافة مُعرف الارتباط إلى جميع الرسائل المُنشأة (المُخزنة في البيانات الوصفية).
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("مرحبًا، عالم!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("إرسال رسالة %s، مُعرف الارتباط: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
المعالجون
قد تكون قد لاحظت وجود نوعين من دوال المعالج:
- دالة
func(msg *message.Message) ([]*message.Message, error)
- طريقة
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
إذا كان المعالج الخاص بك دالة لا تعتمد على أي تبعيات، فإن استخدام الخيار الأول مقبول. عندما يتطلب المعالج بعض التبعيات (مثل مقابض قاعدة البيانات، والسجلات، إلخ)، فإن الاستخدام الخيار الثاني مفيد.
الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> تم استقبال الرسالة: %s\n> %s\n> البيانات الوصفية: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// هنا يمكننا إضافة بعض التبعيات
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("قام structHandler بتلقي الرسالة", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("الرسالة المُنتَجة بواسطة structHandler"))
return message.Messages{msg}, nil
}
تم الانتهاء!
يمكنك تشغيل هذا المثال باستخدام go run main.go
.