स्थापना
go get -u github.com/ThreeDotsLabs/watermill
तैयारी
घटना-निर्देशित अनुप्रयोगों के पीछे की मौलिक विचारधारा हमेशा एक ही होती है: आने वाले संदेशों को सुनना और उनके प्रति प्रतिक्रिया करना। Watermill कई प्रकार के प्रकाशक और ग्राहक के लिए इस व्यवहार को लागू करने का समर्थन करता है।
Watermill का मूल भाग संदेश है, जो http
पैकेज में http.Request
के तरह महत्वपूर्ण है। ज्यादातर Watermill सुविधाएँ किसी न किसी तरीके से इस स्ट्रक्चर का उपयोग करती हैं।
पबसुब लाइब्रेरी द्वारा प्रदान की जाने वाली जटिल सुविधाओं के बावजूद, Watermill के लिए उन्हें उपयोग करना शुरू करने के लिए केवल दो इंटरफेस को लागू करना आवश्यक है: Publisher
और Subscriber
।
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
संदेशों की सदस्यता लेना
सदस्यता लेने के साथ शुरू करते हैं। Subscribe
एक विषय नाम की अपेक्षा करता है और आने वाले संदेश प्राप्त करने के लिए एक चैनल वापस करता है। विषय का विशेष अर्थ पबसब अनुक्रमणिका के अनुमानित अर्थ पर निर्भर करता है।
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("प्राप्त संदेश: %s, पेलोड: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
समर्थित पबसब के विस्तृत उदाहरणों के लिए कृपया निम्नलिखित सामग्री का उपयोग करें।
गो चैनल उदाहरण
पूर्ण उदाहरण कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
पूर्ण उदाहरण कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("प्राप्त संदेश: %s, पेलोड: %s\n", msg.UUID, string(msg.Payload))
// हमें स्वीकार करना चाहिए कि हमने संदेश को प्राप्त और प्रसंस्कृत किया है,
// अन्यथा इसे कई बार फिर से भेजा जाएगा।
msg.Ack()
}
}
Kafka उदाहरण
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// auto.offset.reset: earliest के समान
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("संदेश प्राप्त हुआ: %s, पेयलोड: %s", msg.UUID, string(msg.Payload))
// हमें स्वीकार करना होगा कि हमने संदेश प्राप्त किया और प्रसंस्कृत किया है,
// अन्यथा संदेश बार-बार पुनरारंभ किया जाएगा।
msg.Ack()
}
}
RabbitMQ (AMQP) उदाहरण
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// यह विन्यास निम्न उदाहरण पर आधारित है: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// यह एक सरल कतार के रूप में उपयोग किया जाता है।
//
// यदि आप एक पब / सब प्रकार की सेवा का अनुमान लगाना चाहते हैं, तो कृपया संदर्भित करें
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("संदेश प्राप्त हुआ: %s, पेयलोड: %s", msg.UUID, string(msg.Payload))
// हमें स्वीकार करना चाहिए कि संदेश प्राप्त और प्रसंस्कृत किया गया है,
// अन्यथा, संदेश बार-बार पुनर्प्रेषित किया जाएगा।
msg.Ack()
}
}
SQL उदाहरण
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
पैकेज मुख्य
import (
संदर्भ (
context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "उदाहरण_विषय")
if err != nil {
panic(err)
}
go process(messages)
// ...
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("संदेश प्राप्त: %s, पेलोड: %s", msg.UUID, string(msg.Payload))
// हमें स्वीकार करना होगा कि संदेश प्राप्त कर लिया गया है और प्रसंस्करण किया गया है,
// अन्यथा, संदेश बार-बार पुनः वितरित किया जाएगा।
msg.Ack()
}
}
संदेश बनाएं
Watermill किसी भी संदेश प्रारूप को बाध्य नहीं करता। NewMessage
पेलोड को एक बाइट स्लाइस होने की उम्मीद करता है। आप स्ट्रिंग, JSON, protobuf, Avro, gob, या किसी अन्य प्रारूप का उपयोग कर सकते हैं जो []byte
में संवर्धित किया जा सकता है।
संदेश UUID वैकल्पिक है, लेकिन यह सुझाया गया है क्योंकि यह डीबगिंग में मदद करता है।
msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
संदेश प्रकाशित करें
प्रकाशित
मेथड एक विषय और प्रकाशित करने के लिए एक या एक से अधिक संदेश की आवश्यकता होती है।
err := publisher.Publish("उदाहरण_विषय", msg)
if err != nil {
panic(err)
}
गो चैनल उदाहरण
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
काफ़का उदाहरण
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
RabbitMQ (AMQP) उदाहरण
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
SQL उदाहरण
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "नमस्ते, दुनिया!"}`))
if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
संदेश राउटर का उपयोग
प्रकाशक और सब्सक्राइबर Watermill के निचले स्तरीय घटक हैं। अधिकांश मामलों में, आप उच्च स्तरीय इंटरफ़ेस और सुविधाओं जैसे कि सम्मिलन, मीट्रिक्स, जहरीली कतारें, पुनशः प्रयास, और दर सीमा लगाना चाहेंगे।
आप शायद केवल उस भेजे गए संदेश को सफलतापूर्वक प्रसंस्कृत करने के बाद ही संदेश को स्वीकार करना चाहेंगे। किसी अन्य मामलों में, आप इसे तुरंत स्वीकार करना चाहेंगे और तब इसे प्रसंस्कृत करने की सोचेंगे। कभी-कभी, आप आने वाले संदेश पर आधारित कुछ कार्रवाइयां करना चाहेंगे और उत्तर में एक और संदेश प्रकाशित करना चाहेंगे।
इन आवश्यकताओं को पूरा करने के लिए, एक घटक है राउटर का।
संदेश राउटर का उदाहरण एप्लिकेशन
उदाहरण एप्लिकेशन की फ्लो निम्नलिखित है:
- प्रति सेकंड
प्रविष्टि_संदेश_विषय
पर संदेश उत्पन्न करें। -
struct_handler
सुनाने वालाप्रविष्टि_संदेश_विषय
संदेश को संस्कृत करता है। संदेश प्राप्त करते ही, वह UUID छापता है औरनिकटित_संदेश_विषय
पर नया संदेश उत्पन्न करता है। -
print_incoming_messages
हैंडलरप्रविष्टि_संदेश_विषय
पर सुनते हैं और संदेश का UUID, विषय, और मेटाडेटा छापते हैं। -
print_outgoing_messages
हैंडलरनिकटित_संदेश_विषय
पर सुनते हैं और संदेश का UUID, विषय, और मेटाडेटा छापते हैं। सहयोजन आईडी कोप्रविष्टि_संदेश_विषय
पर संदेश के साथ समान होना चाहिए।
राउटर कॉन्फ़िगरेशन
सबसे पहले, प्लगइन और मिडलवेयर जोड़कर, राउटर को कॉन्फ़िगर करें। फिर, राउटर द्वारा उपयोग किए जाने वाले हैंडलर सेट करें। प्रत्येक हैंडलर अलग-अलग प्रकार के संदेश को स्वतंत्र रूप से प्रोसेस करेगा।
पूरा सोर्स कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// इस उदाहरण के लिए, हम एक सरल लॉगर इंप्लीमेंटेशन का उपयोग करते हैं,
// आपको अपने खुद के `watermill.LoggerAdapter` इंप्लीमेंटेशन प्रदान करना चाहिए।
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SIGTERM सिग्नल प्राप्त करते समय, SignalsHandler ग्रेसफ़ुली राउटर को बंद करेगा।
// आप `r.Close()` को बुलाकर भी राउटर को बंद कर सकते हैं।
router.AddPlugin(plugin.SignalsHandler)
// राउटर स्तर का मिडलवेयर हर संदेश को भेजने के लिए कार्यान्वित किया जाएगा
router.AddMiddleware(
// CorrelationID प्रायोगशाला आईडी को से जनित संदेश की मेटाडेटा से कॉपी करता है
middleware.CorrelationID,
// यदि हैंडलर एक त्रुटि लौटाता है, तो हैंडलर फ़ंक्शन को पुनः प्रयास करें।
// MaxRetries तक पहुंचने के बाद, संदेश Nacked होता है, और PubSub को पुनः भेजने के लिए जिम्मेदार है।
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer हैंडलर में पैनिक को हैंडल करता है।
// इस मामले में, वह उन्हें त्रुटियों के रूप में Retry मिडलवेयर को पारित करता है।
middleware.Recoverer,
)
// सरलता के लिए, हम यहां गोचैनल पब/सब का उपयोग करते हैं,
// आप इसे किसी भी पब/सब इम्प्लीमेंटेशन के साथ बदल सकते हैं और प्रभाव समान रहेगा।
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// पृष्ठीकरण संदेश लेबेल वाले कुछ आने वाले संदेश उत्पन्न करें
go publishMessages(pubSub)
// AddHandler एक हैंडलर देता है जिसे हैंडलर स्तर के मिडलवेयर जोड़ने या हैंडलर को बंद करने के लिए प्रयोग किया जा सकता है।
handler := router.AddHandler(
"struct_handler", // हैंडलर नाम, यूनिक होना चाहिए
"incoming_messages_topic", // जिसमें इवेंट्स पढ़े जाते हैं
pubSub,
"outgoing_messages_topic", // जिसमें इवेंट्स प्रकाशित किए जाते हैं
pubSub,
structHandler{}.Handler,
)
// हैंडलर स्तर का मिडलवेयर केवल विशिष्ट हैंडलर के लिए कार्यान्वित होता है
// इस प्रकार के मिडलवेयर को राउटर स्तर के मिडलवेयर की तरह ही जोड़ा जा सकता है
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Executing handler specific middleware for", message.UUID)
return h(message)
}
})
// डीबगिंग के लिए, हम "incoming_messages_topic" पर प्राप्त सभी संदेशों को प्रिंट करते हैं
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// डीबगिंग के लिए, हम "outgoing_messages_topic" को भेजे गए सभी इवेंट्स को प्रिंट करते हैं
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// अब जब सभी हैंडलर पंजीकृत हैं, हम राउटर को चला रहे हैं।
// रन राउटर को चलाते समय ब्लॉक करता है।
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
आने वाले संदेश
struct_handler
संदेशों को incoming_messages_topic
से सेवन करता है, इसलिए हम पिछली तरफ जाकर publishMessages()
को बुलाकर आने वाले ट्रैफिक को आदरणीय करते हैं। ध्यान दें कि हमने SetCorrelationID
मिडलवेयर जोड़ा है। राउटर सभी जनरेटेड संदेशों में एक को-सम्बंध आईडी जोड़ेगा (मेटाडाटा में संग्रहित)।
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("संदेश %s भेज रहा हूँ, सह-संबंध आईडी: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
हैंडलर
आपने शायद नोट किया हो कि दो प्रकार के हैंडलर फ़ंक्शन होते हैं:
- Function
func(msg *message.Message) ([]*message.Message, error)
- Method
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
अगर आपका हैंडलर कोई डिपेंडेंसी पर निर्भर नहीं करता है, तो पहले विकल्प का उपयोग ठीक है। जब आपके हैंडलर को कुछ डिपेंडेंसी (जैसे डेटाबेस हैंडल, लॉगर्स, आदि) की आवश्यकता होती है, तो दूसरा विकल्प उपयोगी होता है।
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> संदेश प्राप्त हुआ: %s\n> %s\n> मेटाडाटा: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// यहाँ हम कुछ डिपेंडेंसी जोड़ सकते हैं
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler ने संदेश प्राप्त किया", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler द्वारा उत्पन्न संदेश"))
return message.Messages{msg}, nil
}
ऑके!
आप इस उदाहरण को go run main.go
से चला सकते हैं।