การติดตั้ง

go get -u github.com/ThreeDotsLabs/watermill

เตรียมการ

หลักการพื้นฐานที่อยู่เบื้องหลังของแอปพลิเคชันที่ใช้งานในลักษณะการรับส่งข้อความเสมอเสมือนกัน: รับฟังข้อความที่เข้ามาและปฏิบัติต่อข้อความเหล่านั้น Watermill สนับสนุนการทำงานแบบนี้สำหรับผู้ส่งและผู้รับมากมาย

ส่วนสำคัญของ Watermill คือ Message ซึ่งมีความสำคัญเท่ากับ http.Request ในแพ็คเกจ http ส่วนมากของฟีเจอร์ของ Watermill ใช้โครงสร้างนี้อย่างมีประสิทธิภาพ

สังกัดแกนของไลบรารี PubSub มีความซับซ้อนแต่สำหรับ Watermill เพียงแค่ทำการโอนไปสู่การปฏิบัติสองอินเตอร์เฟสคือ Publisher และ Subscriber

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

การสับทึกไปยังข้อความ

ขอให้เริ่มจากการสับทึก Subscribe ต้องการชื่อหัวข้อและคืนค่าแชนเนลเพื่อรับข้อความที่เข้ามา ความหมายเฉพาะของ หัวข้อ ขึ้นอยู่กับการปฏิบัติของ PubSub

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

สำหรับตัวอย่างที่เป็นละเอียดเกี่ยวกับการสับทึกช่องสับทึกที่สนับสนุน กรุณาอ้างถึงเนื้อหารายละเอียดต่อไปนี้

ตัวอย่างช่อง Go

โค้ดตัวอย่างเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

โค้ดตัวอย่างเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

		// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
		// ไม่เช่นนั้นมันจะถูกรับอีกหลายครั้ง
		msg.Ack()
	}
}

ตัวอย่าง Kafka

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// เทียบเท่ากับ auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("ได้รับข้อความ: %s, ข้อมูล: %s", msg.UUID, string(msg.Payload))

		// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
		// มิฉะนั้นข้อความจะถูกส่งซ้ำอย่างต่อเนื่อง
		msg.Ack()
	}
}

ตัวอย่าง RabbitMQ (AMQP)

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// การกำหนดค่านี้ขึ้นอยู่กับตัวอย่างต่อไปนี้: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// ใช้เป็นคิวงานอย่างง่าย
		//
		// หากต้องการดำเนินการเฉพาะในรูปแบบการให้บริการแบบ Pub/Sub โปรดอ้างถึง
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("ได้รับข้อความ: %s, ข้อมูล: %s", msg.UUID, string(msg.Payload))

		// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
		// มิฉะนั้นข้อความจะถูกส่งซ้ำอย่างต่อเนื่อง
		msg.Ack()
	}
}

ตัวอย่าง SQL

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// เราต้องยืนยันว่าข้อความได้รับและประมวลผลแล้ว
		// มิฉะนั้น ข้อความจะถูกส่งซ้ำอย่างไม่มีที่สิ้นสุด
		msg.Ack()
	}
}

สร้างข้อความ

Watermill ไม่บังคับรูปแบบข้อความใด ๆ ฟังก์ชัน NewMessage ต้องการให้ payload เป็นอาร์เรย์ไบต์ คุณสามารถใช้ strings, JSON, protobuf, Avro, gob หรือรูปแบบอื่น ๆ ที่สามารถถูกซีเรียไลซ์เป็น []byte ได้

UUID ของข้อความเป็นส่วนเสริม แต่แนะนำให้ใช้เนื่องจากจะช่วยในการแก้ปัญหาข้อผิดพลาด

msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!")

ตีพิมพ์ข้อความ

เมะทอด ตีพิมพ์ ต้องการหัวข้อ และข้อความหรือมากกว่าหนึ่งข้อความที่จะตีพิมพ์

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

ตัวอย่าง Go Channel

โค้ดแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

ตัวอย่าง Kafka

โค้ดแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

ตัวอย่าง RabbitMQ (AMQP)

รหัสทั้งหมด: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

ตัวอย่าง SQL

รหัสทั้งหมด: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "สวัสดี, โลก!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

การใช้ Message Router

ผู้เผยแพร่และผู้สับที่ เป็นส่วนย่อยของ Watermill ในกรณีส่วนมากคุณอาจต้องการใช้อินเตอร์เฟซและคุณสมบัติระดับสูง เช่น ความสัมพันธ์ เมตริกส์ คิวที่เป็นพิษ การลองใหม่ และการจำกัดอัตรา

บางครั้งคุณอาจต้องการยอมรับข้อความหนึ่งครั้งก่อนที่จะดำเนินการประมวลผลเสร็จสิ้น ในกรณีอื่น คุณอาจต้องการยอมรับทันทีแล้วคิดการประมวลผลต่อไป บางครั้งคุณอาจต้องการดำเนินการบางอย่างตามข้อความที่เข้าและเผยแพร่ข้อความอื่นๆ เพื่อตอบโต้

เพื่อตอบสนองต่อความต้องการเหล่านี้ มีองค์ประกอบที่เรียกว่า Router.

ตัวอย่างการใช้งาน Message Router

การไหลของแอปพลิเคชันตัวอย่างดังนี้:

  1. สร้างข้อความบน incoming_messages_topic ทุกๆ วินาที
  2. ตัวฟังก์ชัน struct_handler รับข้อมูลจาก incoming_messages_topic ภายในกระบวนการรับข้อมูล หลังจากได้รับข้อความ จะปริ้น UUID และสร้างข้อความใหม่บน outgoing_messages_topic
  3. ตัวฟังก์ชัน print_incoming_messages รับข้อมูลจาก incoming_messages_topic และปริ้น UUID, ข้อมูล, และเมตาดาต้าของข้อความ
  4. ตัวฟังก์ชัน print_outgoing_messages รับข้อมูลจาก outgoing_messages_topic และปริ้น UUID, ข้อมูล, และเมตาดาต้าของข้อความ ความสัมพันธ์ควรเหมือนกันกับข้อความบน incoming_messages_topic

การกำหนดค่า Router

ก่อนอื่น ให้กำหนดค่า Router โดยการเพิ่มปลั๊กอินและมิดเดิลแวร์ แล้วตั้งค่าแฮนด์เลอร์ที่ Router จะใช้ แต่ละแฮนด์เลอร์จะประมวลผลข้อความโดยอิสระ

โค้ดซอร์สสมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// For this example, we use a simple logger implementation,
	// you may want to provide your own `watermill.LoggerAdapter` implementation.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// เมื่อได้รับสัญญาณ SIGTERM SignalsHandler จะปิด Router อย่างราบรื่น
	// คุณยังสามารถปิด Router โดยการเรียก `r.Close()` 
	router.AddPlugin(plugin.SignalsHandler)

	// มิดเดิลแวร์ในระดับ Router จะถูกประมวลผลสำหรับทุกข้อความที่ส่งไปยัง Router
	router.AddMiddleware(
		// CorrelationID คัดลอก correlation ID จาก metadata ของข้อความที่เข้ามาไปยังข้อความที่สร้างขึ้น
		middleware.CorrelationID,

		// หาก handler ส่งคืน error ให้ลองส่งฟังก์ชัน handler อีกครั้ง
		// เมื่อถึง MaxRetries แล้วข้อความจะถูก Nacked และ PubSub รับผิดชอบในการส่งข้อความไปใหม่
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer จัดการกับ panics ใน handler
		// ในกรณีนี้ จะส่งมันเป็น errors ไปยัง Retry middleware
		middleware.Recoverer,
	)

	// เพื่อความง่าย เราใช้ gochannel Pub/Sub ที่นี่
	// คุณสามารถแทนที่ด้วยการประมวลผล Pub/Sub ใดก็ได้ และผลลัพธ์ก็จะเหมือนเดิม
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// สร้างข้อความขาเข้าบางอย่างขึ้นมาในพื้นหลัง
	go publishMessages(pubSub)

	// AddHandler ส่งคืน handler ที่สามารถใช้สำหรับเพิ่มมิดเดิลแวร์ในระดับ handler
	// หรือหยุด handler
	handler := router.AddHandler(
		"struct_handler",          // ชื่อ Handler ต้องไม่ซ้ำ
		"incoming_messages_topic", // หัวข้อที่อ่านเหตุการณ์จาก
		pubSub,
		"outgoing_messages_topic", // หัวข้อที่เผยแพร่เหตุการณ์ไปยัง
		pubSub,
		structHandler{}.Handler,
	)

	// มิดเดิลแวร์ในระดับ Handler จะถูกทำงานเฉพาะสำหรับ handler เฉพาะ
	// มิดเดิลแวร์ประเภทนี้สามารถเพิ่มได้ตามการประมวลผลสำหรับ Router
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("กำลังประมวลผลมิดเดิลแวร์เฉพาะสำหรับ", message.UUID)

			return h(message)
		}
	})

	// เพื่อวัตถุประสงค์ในการ debug เราพิมพ์ข้อความที่ได้รับทั้งหมดบน "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// เพื่อเป็นการ debug เราพิมพ์เหตุการณ์ทั้งหมดที่ส่งไปยัง "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// ตอนนี้หลังจากลงทะเบียนแฮนด์เลอร์ทั้งหมดเรากำลังเรียกใช้ router
	// Run บล็อกขณะที่ Router กำลังทำงาน
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

ข้อความขาเข้า

struct_handler ทำการใช้งานข้อความจาก incoming_messages_topic ดังนั้นเราจะจำลองการจราจรขาเข้าโดยการเรียกใช้ publishMessages() ในพื้นหลัง โปรดทราบว่าเราได้เพิ่ม SetCorrelationID middleware แล้ว ตัวเราเตอร์จะเพิ่ม correlation ID ในข้อความที่สร้างขึ้นทั้งหมด (จัดเก็บใน metadata)

โค้ดต้นฉบับ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("กำลังส่งข้อความ %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Handlers

คุณอาจจะเห็นว่ามีสองประเภทของ ฟังก์ชันขั้นสูง:

  1. ฟังก์ชัน func(msg *message.Message) ([]*message.Message, error)
  2. เมธอด func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

หาก handler ของคุณเป็นฟังก์ชันที่ไม่ได้ขึ้นอยู่กับความจำเป็นใด ๆ การใช้ตัวเลือกแรกก็เป็นเรื่องปกติ แต่ถ้า handler ของคุณต้องการความจำเป็นบางอย่าง (เช่น การจัดการข้อมูลฐานข้อมูล บันทึกข้อมูลเหตุการณ์ เป็นต้น) ตัวเลือกที่สองจะมีประโยชน์

โค้ดต้นฉบับ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> ได้รับข้อความ: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // ที่นี่เราสามารถเพิ่มความจำเป็นบางอย่างได้
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler ได้รับข้อความ", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("ข้อความที่ถูกสร้างโดย structHandler"))
    return message.Messages{msg}, nil
}

เรียบร้อยแล้ว!

คุณสามารถรันตัวอย่างนี้โดย go run main.go ได้