تنصيب
go get -u github.com/ThreeDotsLabs/watermill
تیاری
وقیتی ایپلیکیشن کے پیشئے درستی کے لیے بنیادی خیال ہمیشہ یہ ہوتا ہے: آنے والے پیغامات کی سننا اور ان کی تکرار. واٹرمل مختلف پبلیشرز اور سبسکرائبرز کے لیے یہ رفتار کو پیمانے پر لانے کا ایجاد کرتا ہے۔
واٹرمل کا بنیادی حصہ پیغام ہے، جو کہ http
پیکیج میں http.Request
کے برابر اہم ہے۔ زیادہ تر واٹرمل فیچرز اس سٹریکٹ کا استعمال کرتے ہیں۔
پبلیشرز اور سبسکرائبرز کا استعمال شروع کرنے کے لیے پبسب لائبریری کے دیکھ بھال پر عین مشتمل نہیں ہے، جبکہ واٹرمل کے لیے، انہیں استعمال کرنے کے لیے صرف دو انٹرفیس پورے کرنے کافی ہے: Publisher
اور Subscriber
۔
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
پیغامات کے سبسکرائب کرنا
پبلیشرز اور سبسکرائبرز کا استعمال شروع کرتے ہیں۔ Subscribe
ایک ٹاپک نام کا انتظار کرتا ہے اور آمدنی پیغامات کے لیے ایک چینل واپس کرتا ہے۔ ٹاپک کا مخصوص معنی پبسب کے تنصیب پر بھیوندیکاد میں منحصر ہے۔
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
پبسب کی مدد سے معاونتی مثالات کے لیے، براہ کرم مندرجہ ذیل Muayyan کی مواد پر رجوع کریں۔
گو چینل مثال
مکمل مثال کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
مکمل مثال کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// ہمیں تصدیق کرنا ہوں گے کہ ہم نے پیغام کو سنا اور پروسیس کیا ہے،
// ورنہ یہ متعدد بار رسے جائیں گے۔
msg.Ack()
}
}
کافکا کا مثال
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// auto.offset.reset: earliest کے مساوی
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("موصول ہونے والا پیغام: %s, پیلوڈ: %s", msg.UUID, string(msg.Payload))
// ہمیں تصدیق کرنا ہوگا کہ ہم نے پیغام موصول اور عمل کیا ہے،
// ورنہ پیغام بار بار دوبارہ بھیجا جائے گا۔
msg.Ack()
}
}
ریبٹ ایم کیو (AMQP) کا مثال
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// یہ تشکیل مندرجہ ذیل مثال پر مبنی ہے: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// یہ ایک سادہ قطار کے طرز پر استعمال ہوتا ہے۔
//
// اگر آپ ایک پب/سب سٹائل سروس ایمپیلمنٹ کرنا چاہتے ہیں، تو براہ کرم دیکھیں
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("پیغام موصول ہوا: %s, پیلوڈ: %s", msg.UUID, string(msg.Payload))
// ہمیں تصدیق کرنا ہوگا کہ پیغام موصول اور عمل کیا گیا ہے،
// ورنہ پیغام بار بار دوبارہ بھیجا جائے گا۔
msg.Ack()
}
}
SQL Example
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("پیغام موصول ہوا: %s, پیمائش: %s", msg.UUID, string(msg.Payload))
// ہمیں یہ تسلیم کرنا ہوگا کہ پیغام موصول کر لیا گیا ہے اور پروسیس ہوگیا ہے،
// ورنہ، پیغام بار بار دوبارہ تسلیم کیا جائے گا۔
msg.Ack()
}
}
پیغام بنائیں
واٹرمل کسی بھی پیغام فارمیٹ کو نافذ نہیں کرتا۔ NewMessage
کا توقع ہوتا ہے کہ پیمائش باٹ سلائس ہو۔ آپ اسٹرنگ، JSON، پروٹو بف، ایورو، گوب یا کسی اور فارمیٹ کا استعمال کر سکتے ہیں جو []byte
میں سیریلائز ہو سکے۔
پیغام یونیورسل یونیک ایڈینٹیفائر (UUID) اختیاری ہے، مگر یہ مشورہ کیا جاتا ہے کیونکہ یہ دبگ کی ساتھ مدد کرتا ہے۔
msg := message.NewMessage(watermill.NewUUID(), []byte("ہیلو، ورلڈ!"))
پیغام شائع کریں
Publish
میں تاپک اور ایک یا ایک سے زائد پیغامات شائع کرنے کے لئے تھوک چاہئے۔
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
گو چینل مثال
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("ہیلو، ورلڈ!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
کافکا مثال
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("ہیلو، ورلڈ!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
رابطہMQ (AMQP) مثال
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("ہیلو، دنیا!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
SQL مثال
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "ہیلو، دنیا!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
پیغام راولر استعمال کرنے کا طریقہ
ناشرین اور سبقت واٹرمل کے کم تر حصے ہیں۔ زیادہ تر صورتوں میں، آپ عام طور پر تلاش کریں گے کہ تعلقات، میٹرکس، زہریلے قیوم ، دوبارہ کوشش، اور معیار پر محدودیتوں جیسے بلند تر سطح کے انٹرفیس اور خصوصیات استعمال کرنا چاہیں۔
آپ صرف اسے تسلیم کرنا چاہیں گے جب یہ کامیابی سے پراسرار ہوگا۔ دوسری صورتوں میں، آپ اسے فوراً تسلیم کرنا چاہیں گے اور پھر اس کا مطالعہ کرنا چاہیں گے۔ کبھی کبھار، آپ ان آمدنی پر مبنی کچھ امال کرنا چاہیں گے اور اس کے جواب میں ایک اور پیغام شائع کرنا چاہیں گے۔
ان تقاضوں کو پورا کرنے کے لیے، راولر نام کا ایک کمپوننٹ ہوتا ہے۔
پیغام راولر کا مثال اطلاق
مثال کے اطلاق کا دورانداز کچھ یوں ہے:
- ہر سیکنڈ
incoming_messages_topic
پر ایک پیغام پیدا کریں۔ -
struct_handler
سنگت حکم ہےincoming_messages_topic
کو ہینڈل کرے۔ پیغام موصول ہونے پر، یہ UUID چھاپتا ہے اورoutgoing_messages_topic
پر ایک نیا پیغام پیدا کرتا ہے۔ -
print_incoming_messages
ہینڈلرincoming_messages_topic
پر سنگت حکم ہے اور یہ پیغام کا UUID، پیمائش، اور میٹا ڈیٹا چھاپتا ہے۔ -
print_outgoing_messages
ہینڈلرoutgoing_messages_topic
پر سنگت حکم ہے اور یہ پیغام کا UUID، پیمائش، اور میٹا ڈیٹا چھاپتا ہے، جس کا تعلق IDincoming_messages_topic
پر پیغام کے ساتھ ایک جیسا ہونا چاہئے۔
راوٹر تشکیل
پہلے، پلگ ان اور مڈلور کو شامل کرکے راوٹر کو تشکیل دیں۔ پھر، راوٹر کے ہینڈلرز کو سیٹ کریں جنہیں راوٹر استعمال کرے گا۔ ہر ہینڈلر الگ الگ پیغامات کو آزادانہ پروسیس کرے گا۔
مکمل ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// اس مثال کے لئے ہم ایک سادہ لاگر پیادہ کرتے ہیں،
// آپ خود `watermill.LoggerAdapter` کی پیادگی فراہم کر سکتے ہیں۔
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SIGTERM سگنل ملنے پر، SignalsHandler راوٹر کو خوش اختتام دے گا۔
// آپ `r.Close()` کو بھی بلاتے ہوئے، راوٹر کو بند کر سکتے ہیں۔
router.AddPlugin(plugin.SignalsHandler)
// راوٹر کی سطح کے مڈلور وسائل کو ہر بھیجے گئے پیغام کے لئے اجرا کیا جائے گا
router.AddMiddleware(
// CorrelationID کا موافق ایڈ سب سے اہم ہوتا ہے۔
// یہ بریونی پیغام کی میٹا ڈیٹا سے تعلقاتی شناخت کو پیدا کر کے بریونی پیغام میں منتقل کرتا ہے۔
middleware.CorrelationID,
// ہینڈلر ایک غلطی واپس کرتا ہے، تو حنڈلر کو دوبارہ کوشش کرنے والا مڈلور۔
// MaxRetries تک پہنچنے کے بعد، پیغام Nacked ہو جاتا ہے،
// اور PubSub کو اسے دوبارہ بھیجنے کی ذمہ داری ہوتی ہے۔
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer ہینڈلر میں پینک کا علاج کرتا ہے۔
// اس صورت میں، یہ میدلور یہ غلطی کو نقصان کے طور پر Ret میں منتقل کرتا ہے۔
middleware.Recoverer,
)
// سادگی کیلئے، ہم یہاں gochannel Pub/Sub استعمال کرتے ہیں،
// آپ اسے کسی بھی Pub/Sub کی پیادگی سے بدل سکتے ہیں اور اثر یہی ہو گا۔
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// پس منظر میں کچھ آنے والے پیغامات پیدا کریں
go publishMessages(pubSub)
// AddHandler ایک ہینڈلر وصول کرتا ہے جو ہینڈلر کی سطح کے میڈلور کو شامل کرنے یا ہینڈلر کو رکھنے کے لئے استعمال کیا جا سکتا ہے۔
// یا واقعت روکنے کے لئے۔
handler := router.AddHandler(
"struct_handler", // ہینڈلر کا نام، یکساں ہونا چاہئے
"incoming_messages_topic", // وہ ٹاپک جہاں سے واقعات کو پڑھا جاتا ہے
pubSub,
"outgoing_messages_topic", // وہ ٹاپک جہاں واقعات بھیجے جاتے ہیں
pubSub,
structHandler{}.Handler,
)
// ہینڈلر سطح کے میڈلور صرف ایک مخصوص ہینڈلر کے لئے اجرا کیا جاتا ہے
// اس قسم کے میڈلور کو راوٹر کی سطح کے میڈلور کی طرح ہی شامل کیا جا سکتا ہے
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Executing handler specific middleware for", message.UUID)
return h(message)
}
})
// دبگ کے مقاصد کے لئے، ہم "incoming_messages_topic" پر موصول ہونے والے تمام پیغامات چھاپتے ہیں
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// دبگ کے مقاصد کے لئے، ہم "outgoing_messages_topic" کو چھاپتے ہیں جہاں واقعات بھیجے جاتے ہیں
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// اب جب تمام ہینڈلرز رجسٹر ہو چکے ہیں، ہم راوٹر کو چلا رہے ہیں۔
// رن بلوکس جب تک راوٹر چل رہا ہوتا ہے۔
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
واصل ہونے والے پیغامات
struct_handler
incoming_messages_topic
سے پیغامات منسلک کرتا ہے، لہذا ہم پیشہ ورانہ ترسیل کو پیغامات کو بیک گراؤنڈ میں publishMessages()
کے ذریعے سمولیٹ کرتے ہیں۔ نوٹ کریں کہ ہم نے SetCorrelationID
middleware شامل کی ہے۔ راستہ تمام پیدا کردہ پیغاموں میں ایک تعلق ID شامل کرے گا (جو میٹا ڈیٹا میں محفوظ ہوتا ہے)۔
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("ہیلو، دنیا!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("پیغام بھیجنا %s، تعلق ID: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
ہینڈلرز
آپ نے دیکھا ہوگا کہ دو قسم کے ہینڈلر فنکشنز ہیں:
- فنکشن
func(msg *message.Message) ([]*message.Message, error)
- میتھڈ
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
اگر آپ کا ہینڈلر ایک فنکشن ہے جو کسی بھی تعلقات پر منحصر نہیں ہے، تو پہلی اختیار استعمال کرنا ٹھیک ہے۔ جب آپ کے ہینڈلر کوچھ تعلقات (جیسے ڈیٹا بیس ہینڈلز، لاگرز وغیرہ) کی ضرورت ہوتی ہے، تو دوسرا اختیار مفید ہوتا ہے۔
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> موصول ہونے والا پیغام: %s\n> %s\n> میٹا ڈیٹا: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// یہاں ہم کچھ تعلقات شامل کرسکتے ہیں
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler نے پیغام موصول کیا", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler کی طرف سے پیدا کردہ پیغام"))
return message.Messages{msg}, nil
}
مکمل!
آپ اس مثال کو go run main.go
کے ذریعے چلا سکتے ہیں۔