स्थापना

go get -u github.com/ThreeDotsLabs/watermill

तैयारी

घटना-निर्देशित अनुप्रयोगों के पीछे की मौलिक विचारधारा हमेशा एक ही होती है: आने वाले संदेशों को सुनना और उनके प्रति प्रतिक्रिया करना। Watermill कई प्रकार के प्रकाशक और ग्राहक के लिए इस व्यवहार को लागू करने का समर्थन करता है।

Watermill का मूल भाग संदेश है, जो http पैकेज में http.Request के तरह महत्वपूर्ण है। ज्यादातर Watermill सुविधाएँ किसी न किसी तरीके से इस स्ट्रक्चर का उपयोग करती हैं।

पबसुब लाइब्रेरी द्वारा प्रदान की जाने वाली जटिल सुविधाओं के बावजूद, Watermill के लिए उन्हें उपयोग करना शुरू करने के लिए केवल दो इंटरफेस को लागू करना आवश्यक है: Publisher और Subscriber

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

संदेशों की सदस्यता लेना

सदस्यता लेने के साथ शुरू करते हैं। Subscribe एक विषय नाम की अपेक्षा करता है और आने वाले संदेश प्राप्त करने के लिए एक चैनल वापस करता है। विषय का विशेष अर्थ पबसब अनुक्रमणिका के अनुमानित अर्थ पर निर्भर करता है।

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("प्राप्त संदेश: %s, पेलोड: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

समर्थित पबसब के विस्तृत उदाहरणों के लिए कृपया निम्नलिखित सामग्री का उपयोग करें।

गो चैनल उदाहरण

पूर्ण उदाहरण कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

पूर्ण उदाहरण कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
	fmt.Printf("प्राप्त संदेश: %s, पेलोड: %s\n", msg.UUID, string(msg.Payload))

	// हमें स्वीकार करना चाहिए कि हमने संदेश को प्राप्त और प्रसंस्कृत किया है,
	// अन्यथा इसे कई बार फिर से भेजा जाएगा।
	msg.Ack()
}
}

Kafka उदाहरण

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// auto.offset.reset: earliest के समान
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("संदेश प्राप्त हुआ: %s, पेयलोड: %s", msg.UUID, string(msg.Payload))

		// हमें स्वीकार करना होगा कि हमने संदेश प्राप्त किया और प्रसंस्कृत किया है,
		// अन्यथा संदेश बार-बार पुनरारंभ किया जाएगा।
		msg.Ack()
	}
}

RabbitMQ (AMQP) उदाहरण

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// यह विन्यास निम्न उदाहरण पर आधारित है: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// यह एक सरल कतार के रूप में उपयोग किया जाता है।
		//
		// यदि आप एक पब / सब प्रकार की सेवा का अनुमान लगाना चाहते हैं, तो कृपया संदर्भित करें
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("संदेश प्राप्त हुआ: %s, पेयलोड: %s", msg.UUID, string(msg.Payload))

		// हमें स्वीकार करना चाहिए कि संदेश प्राप्त और प्रसंस्कृत किया गया है,
		// अन्यथा, संदेश बार-बार पुनर्प्रेषित किया जाएगा।
		msg.Ack()
	}
}

SQL उदाहरण

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
पैकेज मुख्य

import (
	संदर्भ (
		context"
		stdSQL "database/sql"
		"log"
		"time"

		driver "github.com/go-sql-driver/mysql"

		"github.com/ThreeDotsLabs/watermill"
		"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
		"github.com/ThreeDotsLabs/watermill/message"
	)
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "उदाहरण_विषय")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("संदेश प्राप्त: %s, पेलोड: %s", msg.UUID, string(msg.Payload))

		// हमें स्वीकार करना होगा कि संदेश प्राप्त कर लिया गया है और प्रसंस्करण किया गया है,
		// अन्यथा, संदेश बार-बार पुनः वितरित किया जाएगा।
		msg.Ack()
	}
}

संदेश बनाएं

Watermill किसी भी संदेश प्रारूप को बाध्य नहीं करता। NewMessage पेलोड को एक बाइट स्लाइस होने की उम्मीद करता है। आप स्ट्रिंग, JSON, protobuf, Avro, gob, या किसी अन्य प्रारूप का उपयोग कर सकते हैं जो []byte में संवर्धित किया जा सकता है।

संदेश UUID वैकल्पिक है, लेकिन यह सुझाया गया है क्योंकि यह डीबगिंग में मदद करता है।

msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))

संदेश प्रकाशित करें

प्रकाशित मेथड एक विषय और प्रकाशित करने के लिए एक या एक से अधिक संदेश की आवश्यकता होती है।

err := publisher.Publish("उदाहरण_विषय", msg)
if err != nil {
    panic(err)
}

गो चैनल उदाहरण

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))

		if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

काफ़का उदाहरण

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))

		if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) उदाहरण

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))

		if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

SQL उदाहरण

पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "नमस्ते, दुनिया!"}`))

		if err := publisher.Publish("उदाहरण_विषय", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

संदेश राउटर का उपयोग

प्रकाशक और सब्सक्राइबर Watermill के निचले स्तरीय घटक हैं। अधिकांश मामलों में, आप उच्च स्तरीय इंटरफ़ेस और सुविधाओं जैसे कि सम्मिलन, मीट्रिक्स, जहरीली कतारें, पुनशः प्रयास, और दर सीमा लगाना चाहेंगे।

आप शायद केवल उस भेजे गए संदेश को सफलतापूर्वक प्रसंस्कृत करने के बाद ही संदेश को स्वीकार करना चाहेंगे। किसी अन्य मामलों में, आप इसे तुरंत स्वीकार करना चाहेंगे और तब इसे प्रसंस्कृत करने की सोचेंगे। कभी-कभी, आप आने वाले संदेश पर आधारित कुछ कार्रवाइयां करना चाहेंगे और उत्तर में एक और संदेश प्रकाशित करना चाहेंगे।

इन आवश्यकताओं को पूरा करने के लिए, एक घटक है राउटर का।

संदेश राउटर का उदाहरण एप्लिकेशन

उदाहरण एप्लिकेशन की फ्लो निम्नलिखित है:

  1. प्रति सेकंड प्रविष्टि_संदेश_विषय पर संदेश उत्पन्न करें।
  2. struct_handler सुनाने वाला प्रविष्टि_संदेश_विषय संदेश को संस्कृत करता है। संदेश प्राप्त करते ही, वह UUID छापता है और निकटित_संदेश_विषय पर नया संदेश उत्पन्न करता है।
  3. print_incoming_messages हैंडलर प्रविष्टि_संदेश_विषय पर सुनते हैं और संदेश का UUID, विषय, और मेटाडेटा छापते हैं।
  4. print_outgoing_messages हैंडलर निकटित_संदेश_विषय पर सुनते हैं और संदेश का UUID, विषय, और मेटाडेटा छापते हैं। सहयोजन आईडी को प्रविष्टि_संदेश_विषय पर संदेश के साथ समान होना चाहिए।

राउटर कॉन्फ़िगरेशन

सबसे पहले, प्लगइन और मिडलवेयर जोड़कर, राउटर को कॉन्फ़िगर करें। फिर, राउटर द्वारा उपयोग किए जाने वाले हैंडलर सेट करें। प्रत्येक हैंडलर अलग-अलग प्रकार के संदेश को स्वतंत्र रूप से प्रोसेस करेगा।

पूरा सोर्स कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// इस उदाहरण के लिए, हम एक सरल लॉगर इंप्लीमेंटेशन का उपयोग करते हैं,
	// आपको अपने खुद के `watermill.LoggerAdapter` इंप्लीमेंटेशन प्रदान करना चाहिए।
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM सिग्नल प्राप्त करते समय, SignalsHandler ग्रेसफ़ुली राउटर को बंद करेगा।
	// आप `r.Close()` को बुलाकर भी राउटर को बंद कर सकते हैं।
	router.AddPlugin(plugin.SignalsHandler)

	// राउटर स्तर का मिडलवेयर हर संदेश को भेजने के लिए कार्यान्वित किया जाएगा
	router.AddMiddleware(
		// CorrelationID प्रायोगशाला आईडी को से जनित संदेश की मेटाडेटा से कॉपी करता है
		middleware.CorrelationID,

		// यदि हैंडलर एक त्रुटि लौटाता है, तो हैंडलर फ़ंक्शन को पुनः प्रयास करें।
		// MaxRetries तक पहुंचने के बाद, संदेश Nacked होता है, और PubSub को पुनः भेजने के लिए जिम्मेदार है।
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer हैंडलर में पैनिक को हैंडल करता है।
		// इस मामले में, वह उन्हें त्रुटियों के रूप में Retry मिडलवेयर को पारित करता है।
		middleware.Recoverer,
	)

	// सरलता के लिए, हम यहां गोचैनल पब/सब का उपयोग करते हैं,
	// आप इसे किसी भी पब/सब इम्प्लीमेंटेशन के साथ बदल सकते हैं और प्रभाव समान रहेगा।
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// पृष्ठीकरण संदेश लेबेल वाले कुछ आने वाले संदेश उत्पन्न करें
	go publishMessages(pubSub)

	// AddHandler एक हैंडलर देता है जिसे हैंडलर स्तर के मिडलवेयर जोड़ने या हैंडलर को बंद करने के लिए प्रयोग किया जा सकता है।
	handler := router.AddHandler(
		"struct_handler",          // हैंडलर नाम, यूनिक होना चाहिए
		"incoming_messages_topic", // जिसमें इवेंट्स पढ़े जाते हैं
		pubSub,
		"outgoing_messages_topic", // जिसमें इवेंट्स प्रकाशित किए जाते हैं
		pubSub,
		structHandler{}.Handler,
	)

	// हैंडलर स्तर का मिडलवेयर केवल विशिष्ट हैंडलर के लिए कार्यान्वित होता है
	// इस प्रकार के मिडलवेयर को राउटर स्तर के मिडलवेयर की तरह ही जोड़ा जा सकता है
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Executing handler specific middleware for", message.UUID)

			return h(message)
		}
	})

	// डीबगिंग के लिए, हम "incoming_messages_topic" पर प्राप्त सभी संदेशों को प्रिंट करते हैं
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// डीबगिंग के लिए, हम "outgoing_messages_topic" को भेजे गए सभी इवेंट्स को प्रिंट करते हैं
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// अब जब सभी हैंडलर पंजीकृत हैं, हम राउटर को चला रहे हैं।
	// रन राउटर को चलाते समय ब्लॉक करता है।
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

आने वाले संदेश

struct_handler संदेशों को incoming_messages_topic से सेवन करता है, इसलिए हम पिछली तरफ जाकर publishMessages() को बुलाकर आने वाले ट्रैफिक को आदरणीय करते हैं। ध्यान दें कि हमने SetCorrelationID मिडलवेयर जोड़ा है। राउटर सभी जनरेटेड संदेशों में एक को-सम्बंध आईडी जोड़ेगा (मेटाडाटा में संग्रहित)।

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("नमस्ते, दुनिया!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("संदेश %s भेज रहा हूँ, सह-संबंध आईडी: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

हैंडलर

आपने शायद नोट किया हो कि दो प्रकार के हैंडलर फ़ंक्शन होते हैं:

  1. Function func(msg *message.Message) ([]*message.Message, error)
  2. Method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

अगर आपका हैंडलर कोई डिपेंडेंसी पर निर्भर नहीं करता है, तो पहले विकल्प का उपयोग ठीक है। जब आपके हैंडलर को कुछ डिपेंडेंसी (जैसे डेटाबेस हैंडल, लॉगर्स, आदि) की आवश्यकता होती है, तो दूसरा विकल्प उपयोगी होता है।

पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> संदेश प्राप्त हुआ: %s\n> %s\n> मेटाडाटा: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // यहाँ हम कुछ डिपेंडेंसी जोड़ सकते हैं
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler ने संदेश प्राप्त किया", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler द्वारा उत्पन्न संदेश"))
    return message.Messages{msg}, nil
}

ऑके!

आप इस उदाहरण को go run main.go से चला सकते हैं।