نصب

go get -u github.com/ThreeDotsLabs/watermill

آماده‌سازی

ایده اصلی پشت برنامه‌های مبتنی بر رویداد همیشه یکسان است: گوش دادن به پیام‌های ورودی و واکنش نشان دادن به آن‌ها. Watermill از پیاده‌سازی این رفتار برای چندین منتشرکننده و مشترک حمایت می‌کند.

بخش اصلی Watermill، Message است، که همانند http.Request در بسته‌ی http، اهمیت زیادی دارد. بیشتر ویژگی‌های Watermill به نحوی از این ساختار استفاده می‌کنند.

با وجود ویژگی‌های پیچیده‌ای که توسط کتابخانه PubSub فراهم می‌شود، برای Watermill تنها لازم است که برای شروع استفاده از آن‌ها، دو رابط Publisher و Subscriber پیاده‌سازی شوند.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

مشترک شدن در پیام‌ها

بیایید با مشترک شدن شروع کنیم. Subscribe انتظار دارد که یک نام موضوع را بگیرد و یک کانال برای دریافت پیام‌های ورودی برگرداند. معنای خاصی از موضوع بسته به پیاده‌سازی PubSub است.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("دریافت پیام: %s ، بار مفید: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

برای مثال‌های دقیق‌تر از پشتیبانی شده‌ی PubSub، لطفا به محتوای زیر مراجعه کنید.

مثال Go Channel

کد مثال کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

کد مثال کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("دریافت پیام: %s ، بار مفید: %s\n", msg.UUID, string(msg.Payload))

	// باید تأیید کنیم که پیام را دریافت و پردازش کرده‌ایم، در غیر این صورت ممکن است چندین بار ارسال شود.
	msg.Ack()
	}
}

مثال Kafka

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// معادل auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("پیام دریافت شده: %s, بار مفید: %s", msg.UUID, string(msg.Payload))

		// ما نیاز داریم که تأیید کنیم که پیام را دریافت و پردازش کرده‌ایم،
		// در غیر این صورت پیام به طور مکرر مجدداً ارسال خواهد شد.
		msg.Ack()
	}
}

مثال RabbitMQ (AMQP)

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// این تنظیم بر اساس مثال زیر اعمال شده است: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// از آن به عنوان یک صف ساده استفاده می‌شود.
		//
		// اگر می خواهید یک سرویس سبک انتشار/اشتراک را پیاده سازی کنید، لطفا به
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups مراجعه کنید.
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("پیام دریافت شده: %s, بار مفید: %s", msg.UUID, string(msg.Payload))

		// ما نیاز داریم که تأیید کنیم که پیام دریافت و پردازش شده است،
		// در غیر این صورت پیام به طور مکرر مجدداً تحویل داده خواهد شد.
		msg.Ack()
	}
}

مثال SQL

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// باید پذیرش کنیم که پیام دریافت و پردازش شده است،
		// در غیر این صورت، پیام به طور تکراری مجدداً ارسال خواهد شد.
		msg.Ack()
	}
}

ایجاد پیام

Watermill هیچ فرمت پیامی را نیازمند نمی‌کند. NewMessage انتظار دارد که payload یک بایت باشد. می‌توانید از رشته‌ها، JSON، protobuf، Avro، gob یا هر فرمت دیگری که بتواند به []byte سریالی‌سازی شود، استفاده کنید.

UUID پیام اختیاری است، اما توصیه می‌شود چرا که در اشکال‌زدایی کمک می‌کند.

msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))

انتشار پیام

متد Publish نیازمند یک تاپیک و یک یا چند پیام برای انتشار است.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

مثال Go Channel

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

مثال Kafka

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

نمونه RabbitMQ (AMQP)

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("سلام، دنیا!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

نمونه SQL

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "سلام، دنیا!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

استفاده از مسیریاب پیام

انتشارکنندگان و مشترک‌ها بخش‌های سطح پایین‌تر از Watermill هستند. در اکثر موارد، شما به طور معمول می‌خواهید از رابط‌ها و ویژگی‌های سطح بالاتری مانند همبستگی‌ها، معیارها، صفوف زهر پراکنده، تکرارها و محدودیت نرخ استفاده کنید.

ممکن است بخواهید فقط پیام را در صورت پردازش موفق ارجاع دهید. در موارد دیگر ممکن است بخواهید فوراً آن را تأیید کنید و سپس در نظر بگیرید آن را پردازش کنید. گاهی اوقات ممکن است بخواهید اقدامات خاصی بر اساس پیام ورودی انجام دهید و یک پیام دیگر در پاسخ منتشر کنید.

برای تأمین این نیازها، یک مؤلفه به نام مسیریاب وجود دارد.

نمونه برنامه از مسیریاب پیام

جریان برنامه نمونه به شرح زیر است:

  1. هر ثانیه یک پیام در تاپیک incoming_messages_topic تولید شود.
  2. گوش دهی‌کننده struct_handler تاپیک incoming_messages_topic را پردازش می‌کند. دریافت پیام، UUID را چاپ کرده و یک پیام جدید در تاپیک outgoing_messages_topic تولید می‌کند.
  3. گوش دهی‌کننده print_incoming_messages بر روی تاپیک incoming_messages_topic گوش داده و UUID، بار و فراداده پیام را چاپ می‌کند.
  4. گوش دهی‌کننده print_outgoing_messages بر روی تاپیک outgoing_messages_topic گوش داده و UUID، بار و فراداده پیام را چاپ می‌کند. شناسه همبستگی باید همان پیام در تاپیک incoming_messages_topic باشد.

پیکربندی روتر

اول، با افزودن افزونه ها و میان افزار، روتر را پیکربندی کنید. سپس، دستگیره هایی را که روتر استفاده خواهد کرد تعیین کنید. هر دستگیره به طور مستقل پیام ها را پردازش خواهد کرد.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// برای این مثال، ما از یک پیاده سازی ساده یاگین استفاده میکنیم،
	// شما ممکن است خواهید خواست پیاده سازی خودتان از `watermill.LoggerAdapter` را ارائه دهید.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// زمانی که سیگنال SIGTERM دریافت شود، SignalsHandler به طور متمایل روتر را میبندد.
	// همچنین میتوانید با فراخوانی `r.Close()`، روتر را ببندید.
	router.AddPlugin(plugin.SignalsHandler)

	// میان افزارهای سطح روتر به صورت مستقل برای هر پیامی که به روتر ارسال شده اجرا میشوند.
	router.AddMiddleware(
		// CorrelationID ایدنتیفای کردن مشترک، شناسه ارتباطی را از متادیتای پیام ورودی به پیام تولید شده کپی میکند.
		middleware.CorrelationID,

		// اگر دستگیره خطایی را برمیگرداند، توابع دستگیره را مجدداً سعی میکند.
		// پس از رسیدن به حداکثر تکرارها، پیام Nack میشود و PubSub مسئول مجدد ارسال آن میباشد.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer با خطاهای دستگیره برخورد میکند.
		// در این حالت، آنها را به عنوان خطاها به میان افزار Retry منتقل میکند.
		middleware.Recoverer,
	)

	// به سادگی، از Pub/Sub gochannel برای اینجا استفاده کرده ایم،
	// شما میتوانید آن را با هر پیاده سازی Pub/Sub دیگری جایگزین کنید و تاثیر آن مشابه خواهد بود.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// پیام های ورودی را به پس زمینه منتشر میکنیم.
	go publishMessages(pubSub)

	// AddHandler یک دستگیره ایجاد میکند که میتواند برای افزودن میان افزارهای سطح دستگیره یا متوقف کردن دستگیره استفاده شود.
	handler := router.AddHandler(
		"struct_handler",          // نام دستگیره، باید یکتا باشد
		"incoming_messages_topic", // تاپیکی که رویدادها از آن خوانده میشوند
		pubSub,
		"outgoing_messages_topic", // تاپیکی که رویدادها به آن منتشر میشوند
		pubSub,
		structHandler{}.Handler,
	)

	// میان افزارهای سطح دستگیره فقط برای یک دستگیره خاص اجرا میشوند
	// این نوع میان افزار میتواند به همان روش میان افزارهای سطح روتر اضافه شود.
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("اجرای میان افزار خاص دستگیره برای", message.UUID)

			return h(message)
		}
	})

	// به منظور اشکال زدایی، تمام پیام های دریافت شده در "incoming_messages_topic" را چاپ میکنیم
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// به منظور اشکال زدایی، تمام رویدادهای ارسال شده به "outgoing_messages_topic" چاپ میکنیم
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// حال که همه دستگیره ها ثبت شده اند، ما روتر را اجرا میکنیم.
	// Run در حال اجرا روتر بلاک میکند.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

پیام‌های ورودی

struct_handler پیام‌ها را از incoming_messages_topic مصرف می‌کند، بنابراین ما ترافیک ورودی را با فراخوانی publishMessages() در پس‌زمینه شبیه‌سازی می‌کنیم. توجه داشته باشید که ما middleware SetCorrelationID را اضافه کردیم. روتر یک شناسه همسان را به تمام پیام‌های تولید شده اضافه می‌کند (در metadata ذخیره شده).

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("درحال ارسال پیام %s، شناسه همسان: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

دستگیره‌ها

شما ممکن است متوجه شوید که دو نوع تابع دستگیره وجود دارد:

  1. تابع func(msg *message.Message) ([]*message.Message, error)
  2. متد func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

اگر دستگیره شما یک تابع است که وابستگی به هیچ وابستگی‌هایی ندارد، استفاده از گزینه اول مناسب است. وقتی دستگیره شما احتیاج به برخی از وابستگی‌ها دارد (مانند دسترسی به پایگاه داده‌ها، loggers و غیره)، گزینه دوم مفید است.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> پیام دریافت شد: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // اینجا می‌توانیم برخی از وابستگی‌ها را اضافه کنیم
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler پیام را دریافت کرد", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("پیام توسط structHandler تولید شده است"))
    return message.Messages{msg}, nil
}

انجام شد!

شما می‌توانید این مثال را با استفاده از go run main.go اجرا کنید.