ইনস্টলেশন

go get -u github.com/ThreeDotsLabs/watermill

প্রস্তুতি

ইভেন্ট-ড্রিভেন অ্যাপ্লিকেশানের মৌলিক ধারণা সর্বদা একই: আসা বার্তা শোনা এবং তাদের প্রতিক্রিয়া দেওয়া। Watermill এই আচরণটি একাধিক পাবলিশার এবং সাবস্ক্রাইবারের জন্য অনুমোদন করানোয়।

Watermill-এর মৌলিক অংশটি হল মেসেজ যা http প্যাকেজের http.Request এর মতো গুরুত্বপূর্ণ। অনেক উদাহরণে Watermill এই struct টি কিছুটা ব্যবহার করে।

PubSub লাইব্রেরি দ্বারা প্রদান করা জটিল বৈশিষ্ট্যগুলির বিরুদ্ধে, Watermill-এ, এগুলি ব্যবহার করতে শুরু করতে শুধুমাত্র দুটি ইন্টারফেস ইমপ্লিমেন্ট করা প্রয়োজন: Publisher এবং Subscriber

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

মেসেজের সাবস্ক্রাইব

চলুন সাবস্ক্রাইবিং দিয়ে শুরু করি। Subscribe টি একটি টপিক নামের প্রত্যাশিত এবং আসা বার্তা পেতে একটি চ্যানেল ফেরৎ করে। টপিক এর বিশেষ অর্থটি PubSub ইমপ্লিমেন্টেশনে নির্ভর করে।

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

সমর্থিত PubSub এর বিস্তারিত উদাহরণের জন্য, দয়া করে নিম্নলিখিত বিষয়গুলির জন্য পাঠ্যটি দেখুন।

গো চ্যানেল উদাহরণ

সম্পূর্ণ উদাহরণ কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

সম্পূর্ণ উদাহরণ কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

		// আমাদের উপাদানটি গ্রহণ এবং প্রসেস করা হয়েছে, এটি অনুমোদিত করতে হবে,
		// অন্যথায় এটি একাধিক বার পুনরাবৃত্তি হবে।
		msg.Ack()
	}
}

কাফকা উদাহরণ

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// অটো অফসেট রিসেটের সমতুল্য: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("বার্তা পেয়েছি: %s, পেলোড: %s", msg.UUID, string(msg.Payload))

		// আমাদের প্রমাণীভূত করতে হবে যে, আমরা বার্তা পেয়েছি এবং প্রসেস করেছি,
		// অন্যথায় বার্তা পুনরায় পাঠানো হবে বার্তা পুনরাবৃত্তি.
		msg.Ack()
	}
}

RabbitMQ (AMQP) উদাহরণ

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// এই কনফিগারেশনটি ভিত্তি করে তৈরি করা: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// এটা একটি সাধারণ কিউ হিসেবে ব্যবহৃত হয়েছে।
		//
		// আপনি যদি একটি পাব/সাব স্টাইল সার্ভিস প্রয়োগ করতে চান, তবে দয়া করে
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups দেখুন
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("বার্তা পেয়েছি: %s, পেলোড: %s", msg.UUID, string(msg.Payload))

		// আমাদের প্রমাণীভূত করতে হবে যে বার্তা পেয়েছে এবং প্রসেস করা হয়েছে,
		// অন্যথায়, বার্তা পুনরপাঠানো হবে বার্তা পুনরাবৃত্তি হবে।
		msg.Ack()
	}
}

SQL উদাহরণ

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// আমাদের যা মেসেজটি পেয়েছি এবং প্রসেস করা হয়েছে সে জানাতে আমাদের মেসেজ গুলি অ্যাকনালেজ করা প্রয়োজন,
		// অন্যথায়, মেসেজটি পুনরায় প্রেরণ করা হবে অনবরতমভাবে।
		msg.Ack()
	}
}

মেসেজ তৈরি

Watermill কোনও মেসেজ ফরম্যাট বাধ্য করে না। NewMessage পেয়ে সাহায্য করে বাইট স্লাইসে পারদ বোধিত করে। আপনি স্ট্রিং, JSON, protobuf, Avro, gob বা আর কোন ফরম্যাট ব্যবহার করতে পারেন, যা []byte তে সিরিয়ালাইজ করা যায়।

মেসেজ UUID ঐচ্ছিক, তবে এটি ডিবাগিং করার সাথে সাহায্য করে, এটি পুনরায় প্রেরণ করা হয়ে থাকে।

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

মেসেজ প্রকাশ

Publish মেথডটি একটি টপিক এবং একাধিক মেসেজ প্রকাশের জন্য প্রয়োজন।

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Go চ্যানেল উদাহরণ

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Kafka উদাহরণ

সম্পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

র‌্যাবিটএমকিউ (এএএমপি) উদাহরণ

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("হ্যালো, বিশ্ব!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

এসকিউএল উদাহরণ

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "হ্যালো, বিশ্ব!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

মেসেজ রাউটার ব্যবহার

পাবলিশার এবং সাবস্ক্রাইবার Watermill-এর নিচুতম স্তরের অংশ। বেশিরভাগ ক্ষেত্রে, আপনি সাধারণত করে করে সম্পর্কিত ইন্টারফেস এবং বৈশিষ্ট্য ব্যবহার করতে চান, যেমন সম্পর্ক, মেট্রিক্স, জেহাদী কিউ, পুনরায় চেষ্টা, এবং হার সীমাবদ্ধতা।

আপনি শুধু মেসেজটি সফলভাবে প্রসেস করা হলে ম্যাসেজটি স্বীকার্য হওয়ার জন্য চান তো। অন্যান্য ক্ষেত্রে, আপনি তা তা্ত্পর্যপূর্ণভাবে শুনি এবং তারপর পরিষ্কার করতে চান। অক্ষম হলে, আপনি আগতে প্রসেসিং-এর উপর নির্ভর করে নতুন মেসেজ প্রকাশ করতে চান।

এই চাহিদাগুলি পূরণ করার জন্য, রাউটার নামক একটি কম্পোনেন্ট আছে।

মেসেজ রাউটার এর উদাহরণ অ্যাপ্লিকেশন

উদাহরণ অ্যাপ্লিকেশনের প্রবাহ নিম্নলিখিত:

  1. প্রতি সেকেন্ড একটি মেসেজ জেনারেট করুন incoming_messages_topic এ।
  2. struct_handler লিস্টেনার প্রসাবকের ম•sেজটি হ্যান্ডেল করে। একটি মে•sেজ প্রাপ্ত করে, সে UUID ছাপাতে এবং outgoing_messages_topic উপর একটি নতুন মে•sেজ জেনারেট করে।
  3. print_incoming_messages হ্যান্ডলার incoming_messages_topic উপর লিস্টেন করে এবং ম•sেজটির UUID, পেলোড এবং মিটাডেটা ছাপা।
  4. print_outgoing_messages হ্যান্ডলার outgoing_messages_topic উপর লিস্টেন করে এবং ম•sেজটির UUID, পেলোড এবং মিটাডেটা ছাপা। সম্পৃদায় আইডি অবশ্যই হবে incoming_messages_topic উপরের মে•sেজের সাথে একই।

রাউটার কনফিগারেশন

প্রথমে, প্লাগইন এবং মিডলওয়্যার যোগ করে রাউটারটি কনফিগার করুন। তারপরে, রাউটারটি ব্যবহার করবে তা হ্যান্ডলার সেট করুন। প্রতি হ্যান্ডলার স্বতন্ত্রভাবে বার্তা প্রসেস করবে।

github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go এ সম্পূর্ণ সোর্স কোড পাবেন।

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// এই উদাহরণের জন্য, আমরা একটি সহজ লগার ইমপ্লিমেন্টেশন ব্যবহার করছি,
	// আপনি নিজের জন্য `watermill.LoggerAdapter` ইমপ্লিমেন্টেশন প্রদান করতে চাইতে পারেন।
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM সিগনাল পেলে, SignalsHandler রাউটারকে gracefully বন্ধ করবে।
	// রাউটারটি আপনি চাইলে `r.Close()` কল করেও বন্ধ করতে পারেন।
	router.AddPlugin(plugin.SignalsHandler)

	// রাউটার-এর লেভেল মিডলওয়্যার প্রতি ম্যাসেজ পাঠানোর জন্য ব্যাবহার করা হবে
	router.AddMiddleware(
		// CorrelationID করেলেশন আইডি বহন ম্যাসেজের মেটাডেটা থেকে উৎপন্ন ম্যাসেজ প্রতি কপি করে
		middleware.CorrelationID,

		// যদি হ্যান্ডলারটি একটি ত্রুটি ফিরিয়ে, তাহলে হ্যান্ডলার ফাংশনটি retry করুন।
		// MaxRetries এর পর ম্যাসেজটি Nacked হবে, এবং PubSub টি এটি পুনরায় প্রেরণ করার জন্য দায়ী থাকবে।
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer হ্যান্ডলারে প্যানিক হ্যান্ডিল করে।
		// এই মামলায়, এটি তাদেরকে ত্রুটি হিসেবে Retry মিডলওয়্যারে পাঠানো হয়।
		middleware.Recoverer,
	)

	// সরলতার জন্য, আমরা এখানে gochannel Pub/Sub ব্যবহার করছি,
	// আপনি এটি যে কোনো Pub/Sub ইমপ্লিমেন্টেশন দিয়ে replace করতে পারেন এবং পারিণাম একই থাকবে।
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// ব্যাকগ্রাউন্ডে কিছু আসতে ম্যাসেজ উৎপন্ন করুন
	go publishMessages(pubSub)

	// AddHandler একটি হ্যান্ডলার ফেরী করে যা ব্যবহার করা যাবে হ্যান্ডলার-লেভেল মিডলওয়্যার যোগ করতে
	// বা হ্যান্ডলার থামাতে।
	handler := router.AddHandler(
		"struct_handler",          // হ্যান্ডলার নাম, অবশ্যই অনন্য
		"incoming_messages_topic", // কোন বিষয় থেকে ইভেন্ট পাঠানো হয়
		pubSub,
		"outgoing_messages_topic", // যে বিষয়ে ইভেন্ট প্রকাশ করা হয়
		pubSub,
		structHandler{}.Handler,
	)

	// হ্যান্ডলার-লেভেল মিডলওয়্যার শুধুমাত্র নির্দিষ্ট হ্যান্ডলারের জন্য ব্যাবহার করা হয়
	// এই রকমের মিডলওয়্যারটি ঠিক একেই উপাদান হিসেবে যোগ করা যেতে পারে যেমন রাউটার লেভেল মিডলওয়্যার
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Executing handler specific middleware for", message.UUID)

			return h(message)
		}
	})

	// ডিবাগিং উদ্দেশ্যে, "incoming_messages_topic" এ প্রাপ্ত সমস্ত ম্যাসেজ মুদ্রণ করছি
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// ডিবাগিং উদ্দেশ্যে, "outgoing_messages_topic" এ প্রেরণের জন্য সমস্ত ইভেন্ট মুদ্রণ করছি
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// এখন যখন সমস্ত হ্যান্ডলার নিবন্ধন হয়েছে, তাহলে আমরা রাউটারকে চালিয়ে আসছি।
	// Run রাউটারটি চলতে থাকবে, যতক্ষণ না তা রাউটার চালু রাখা থাকে।
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

সন্নিবেশিত বার্তা

struct_handler আগমনী বার্তা গুলি থেকে বার্তা নিয়ন্ত্রণ করে, তাই আমরা প্রায়শই পদক্ষেপ নিয়ে বৃহত্তর বার্তা সিমুলেট করি অথবা বেকগ্রাউন্ডে publishMessages() কল করি। দয়া করে লক্ষ্য করুন যে আমরা SetCorrelationID মিডলওয়ার যোগ করেছি। রাউটার সমস্ত তৈরি বার্তাগুলির জন্য একটি সঙ্গতিকর ID যুক্ত করবে (মেটাডেটায় সংরক্ষণ করা)।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("হ্যালো, বিশ্ব!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("বার্তা প্ঠারা হয়েছে %s, সম্পর্ক চিহ্ন: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

হ্যান্ডলারগুলি

আপনারা এখন দুটি প্রকারের হ্যান্ডলার ফাংশন দেখে এসেছেন:

  1. ফাংশন func(msg *message.Message) ([]*message.Message, error)
  2. মেথড func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

আপনার হ্যান্ডলার যদি কোনো প্রয়োজনীয় নির্ভরণা ছাড়া একটি ফাংশন হয়, তবে প্রথম বিকল্পটি ব্যবহার করা উচিত। যদি আপনার হ্যান্ডলার কিছু নির্ভরতা প্রয়োজন করে (যেমন: ডেটাবেস হ্যান্ডেল, লগার, ইত্যাদি), তাহলে দ্বিতীয় বিকল্পটি ব্যবহার করা উচিত।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> বার্তা গৃহীত: %s\n> %s\n> মেটাডেটা: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // এখানে আমরা কিছু নির্ভরতা যুক্ত করতে পারি
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler বার্তা পৌঁছে", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler দ্বারা তৈরি করা বার্তা"))
    return message.Messages{msg}, nil
}

সমাপ্ত!

আপনি এই উদাহরণ চালিয়ে দেখতে পারেন go run main.go