نصب
go get -u github.com/ThreeDotsLabs/watermill
آمادهسازی
ایده اصلی پشت برنامههای مبتنی بر رویداد همیشه یکسان است: گوش دادن به پیامهای ورودی و واکنش نشان دادن به آنها. Watermill از پیادهسازی این رفتار برای چندین منتشرکننده و مشترک حمایت میکند.
بخش اصلی Watermill، Message است، که همانند http.Request
در بستهی http
، اهمیت زیادی دارد. بیشتر ویژگیهای Watermill به نحوی از این ساختار استفاده میکنند.
با وجود ویژگیهای پیچیدهای که توسط کتابخانه PubSub فراهم میشود، برای Watermill تنها لازم است که برای شروع استفاده از آنها، دو رابط Publisher
و Subscriber
پیادهسازی شوند.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
مشترک شدن در پیامها
بیایید با مشترک شدن شروع کنیم. Subscribe
انتظار دارد که یک نام موضوع را بگیرد و یک کانال برای دریافت پیامهای ورودی برگرداند. معنای خاصی از موضوع بسته به پیادهسازی PubSub است.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("دریافت پیام: %s ، بار مفید: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
برای مثالهای دقیقتر از پشتیبانی شدهی PubSub، لطفا به محتوای زیر مراجعه کنید.
مثال Go Channel
کد مثال کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
کد مثال کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("دریافت پیام: %s ، بار مفید: %s\n", msg.UUID, string(msg.Payload))
// باید تأیید کنیم که پیام را دریافت و پردازش کردهایم، در غیر این صورت ممکن است چندین بار ارسال شود.
msg.Ack()
}
}
مثال Kafka
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// معادل auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("پیام دریافت شده: %s, بار مفید: %s", msg.UUID, string(msg.Payload))
// ما نیاز داریم که تأیید کنیم که پیام را دریافت و پردازش کردهایم،
// در غیر این صورت پیام به طور مکرر مجدداً ارسال خواهد شد.
msg.Ack()
}
}
مثال RabbitMQ (AMQP)
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// این تنظیم بر اساس مثال زیر اعمال شده است: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// از آن به عنوان یک صف ساده استفاده میشود.
//
// اگر می خواهید یک سرویس سبک انتشار/اشتراک را پیاده سازی کنید، لطفا به
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups مراجعه کنید.
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("پیام دریافت شده: %s, بار مفید: %s", msg.UUID, string(msg.Payload))
// ما نیاز داریم که تأیید کنیم که پیام دریافت و پردازش شده است،
// در غیر این صورت پیام به طور مکرر مجدداً تحویل داده خواهد شد.
msg.Ack()
}
}
مثال SQL
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// باید پذیرش کنیم که پیام دریافت و پردازش شده است،
// در غیر این صورت، پیام به طور تکراری مجدداً ارسال خواهد شد.
msg.Ack()
}
}
ایجاد پیام
Watermill هیچ فرمت پیامی را نیازمند نمیکند. NewMessage
انتظار دارد که payload یک بایت باشد. میتوانید از رشتهها، JSON، protobuf، Avro، gob یا هر فرمت دیگری که بتواند به []byte
سریالیسازی شود، استفاده کنید.
UUID پیام اختیاری است، اما توصیه میشود چرا که در اشکالزدایی کمک میکند.
msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))
انتشار پیام
متد Publish
نیازمند یک تاپیک و یک یا چند پیام برای انتشار است.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
مثال Go Channel
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
مثال Kafka
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
نمونه RabbitMQ (AMQP)
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("سلام، دنیا!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
نمونه SQL
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "سلام، دنیا!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
استفاده از مسیریاب پیام
انتشارکنندگان و مشترکها بخشهای سطح پایینتر از Watermill هستند. در اکثر موارد، شما به طور معمول میخواهید از رابطها و ویژگیهای سطح بالاتری مانند همبستگیها، معیارها، صفوف زهر پراکنده، تکرارها و محدودیت نرخ استفاده کنید.
ممکن است بخواهید فقط پیام را در صورت پردازش موفق ارجاع دهید. در موارد دیگر ممکن است بخواهید فوراً آن را تأیید کنید و سپس در نظر بگیرید آن را پردازش کنید. گاهی اوقات ممکن است بخواهید اقدامات خاصی بر اساس پیام ورودی انجام دهید و یک پیام دیگر در پاسخ منتشر کنید.
برای تأمین این نیازها، یک مؤلفه به نام مسیریاب وجود دارد.
نمونه برنامه از مسیریاب پیام
جریان برنامه نمونه به شرح زیر است:
- هر ثانیه یک پیام در تاپیک
incoming_messages_topic
تولید شود. - گوش دهیکننده
struct_handler
تاپیکincoming_messages_topic
را پردازش میکند. دریافت پیام، UUID را چاپ کرده و یک پیام جدید در تاپیکoutgoing_messages_topic
تولید میکند. - گوش دهیکننده
print_incoming_messages
بر روی تاپیکincoming_messages_topic
گوش داده و UUID، بار و فراداده پیام را چاپ میکند. - گوش دهیکننده
print_outgoing_messages
بر روی تاپیکoutgoing_messages_topic
گوش داده و UUID، بار و فراداده پیام را چاپ میکند. شناسه همبستگی باید همان پیام در تاپیکincoming_messages_topic
باشد.
پیکربندی روتر
اول، با افزودن افزونه ها و میان افزار، روتر را پیکربندی کنید. سپس، دستگیره هایی را که روتر استفاده خواهد کرد تعیین کنید. هر دستگیره به طور مستقل پیام ها را پردازش خواهد کرد.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// برای این مثال، ما از یک پیاده سازی ساده یاگین استفاده میکنیم،
// شما ممکن است خواهید خواست پیاده سازی خودتان از `watermill.LoggerAdapter` را ارائه دهید.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// زمانی که سیگنال SIGTERM دریافت شود، SignalsHandler به طور متمایل روتر را میبندد.
// همچنین میتوانید با فراخوانی `r.Close()`، روتر را ببندید.
router.AddPlugin(plugin.SignalsHandler)
// میان افزارهای سطح روتر به صورت مستقل برای هر پیامی که به روتر ارسال شده اجرا میشوند.
router.AddMiddleware(
// CorrelationID ایدنتیفای کردن مشترک، شناسه ارتباطی را از متادیتای پیام ورودی به پیام تولید شده کپی میکند.
middleware.CorrelationID,
// اگر دستگیره خطایی را برمیگرداند، توابع دستگیره را مجدداً سعی میکند.
// پس از رسیدن به حداکثر تکرارها، پیام Nack میشود و PubSub مسئول مجدد ارسال آن میباشد.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer با خطاهای دستگیره برخورد میکند.
// در این حالت، آنها را به عنوان خطاها به میان افزار Retry منتقل میکند.
middleware.Recoverer,
)
// به سادگی، از Pub/Sub gochannel برای اینجا استفاده کرده ایم،
// شما میتوانید آن را با هر پیاده سازی Pub/Sub دیگری جایگزین کنید و تاثیر آن مشابه خواهد بود.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// پیام های ورودی را به پس زمینه منتشر میکنیم.
go publishMessages(pubSub)
// AddHandler یک دستگیره ایجاد میکند که میتواند برای افزودن میان افزارهای سطح دستگیره یا متوقف کردن دستگیره استفاده شود.
handler := router.AddHandler(
"struct_handler", // نام دستگیره، باید یکتا باشد
"incoming_messages_topic", // تاپیکی که رویدادها از آن خوانده میشوند
pubSub,
"outgoing_messages_topic", // تاپیکی که رویدادها به آن منتشر میشوند
pubSub,
structHandler{}.Handler,
)
// میان افزارهای سطح دستگیره فقط برای یک دستگیره خاص اجرا میشوند
// این نوع میان افزار میتواند به همان روش میان افزارهای سطح روتر اضافه شود.
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("اجرای میان افزار خاص دستگیره برای", message.UUID)
return h(message)
}
})
// به منظور اشکال زدایی، تمام پیام های دریافت شده در "incoming_messages_topic" را چاپ میکنیم
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// به منظور اشکال زدایی، تمام رویدادهای ارسال شده به "outgoing_messages_topic" چاپ میکنیم
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// حال که همه دستگیره ها ثبت شده اند، ما روتر را اجرا میکنیم.
// Run در حال اجرا روتر بلاک میکند.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
پیامهای ورودی
struct_handler
پیامها را از incoming_messages_topic
مصرف میکند، بنابراین ما ترافیک ورودی را با فراخوانی publishMessages()
در پسزمینه شبیهسازی میکنیم. توجه داشته باشید که ما middleware SetCorrelationID
را اضافه کردیم. روتر یک شناسه همسان را به تمام پیامهای تولید شده اضافه میکند (در metadata ذخیره شده).
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("سلام دنیا!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("درحال ارسال پیام %s، شناسه همسان: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
دستگیرهها
شما ممکن است متوجه شوید که دو نوع تابع دستگیره وجود دارد:
- تابع
func(msg *message.Message) ([]*message.Message, error)
- متد
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
اگر دستگیره شما یک تابع است که وابستگی به هیچ وابستگیهایی ندارد، استفاده از گزینه اول مناسب است. وقتی دستگیره شما احتیاج به برخی از وابستگیها دارد (مانند دسترسی به پایگاه دادهها، loggers و غیره)، گزینه دوم مفید است.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> پیام دریافت شد: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// اینجا میتوانیم برخی از وابستگیها را اضافه کنیم
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler پیام را دریافت کرد", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("پیام توسط structHandler تولید شده است"))
return message.Messages{msg}, nil
}
انجام شد!
شما میتوانید این مثال را با استفاده از go run main.go
اجرا کنید.