การติดตั้ง
go get -u github.com/ThreeDotsLabs/watermill
เตรียมการ
หลักการพื้นฐานที่อยู่เบื้องหลังของแอปพลิเคชันที่ใช้งานในลักษณะการรับส่งข้อความเสมอเสมือนกัน: รับฟังข้อความที่เข้ามาและปฏิบัติต่อข้อความเหล่านั้น Watermill สนับสนุนการทำงานแบบนี้สำหรับผู้ส่งและผู้รับมากมาย
ส่วนสำคัญของ Watermill คือ Message ซึ่งมีความสำคัญเท่ากับ http.Request
ในแพ็คเกจ http
ส่วนมากของฟีเจอร์ของ Watermill ใช้โครงสร้างนี้อย่างมีประสิทธิภาพ
สังกัดแกนของไลบรารี PubSub มีความซับซ้อนแต่สำหรับ Watermill เพียงแค่ทำการโอนไปสู่การปฏิบัติสองอินเตอร์เฟสคือ Publisher
และ Subscriber
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
การสับทึกไปยังข้อความ
ขอให้เริ่มจากการสับทึก Subscribe
ต้องการชื่อหัวข้อและคืนค่าแชนเนลเพื่อรับข้อความที่เข้ามา ความหมายเฉพาะของ หัวข้อ ขึ้นอยู่กับการปฏิบัติของ PubSub
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
สำหรับตัวอย่างที่เป็นละเอียดเกี่ยวกับการสับทึกช่องสับทึกที่สนับสนุน กรุณาอ้างถึงเนื้อหารายละเอียดต่อไปนี้
ตัวอย่างช่อง Go
โค้ดตัวอย่างเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
โค้ดตัวอย่างเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
// ไม่เช่นนั้นมันจะถูกรับอีกหลายครั้ง
msg.Ack()
}
}
ตัวอย่าง Kafka
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// เทียบเท่ากับ auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("ได้รับข้อความ: %s, ข้อมูล: %s", msg.UUID, string(msg.Payload))
// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
// มิฉะนั้นข้อความจะถูกส่งซ้ำอย่างต่อเนื่อง
msg.Ack()
}
}
ตัวอย่าง RabbitMQ (AMQP)
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// การกำหนดค่านี้ขึ้นอยู่กับตัวอย่างต่อไปนี้: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// ใช้เป็นคิวงานอย่างง่าย
//
// หากต้องการดำเนินการเฉพาะในรูปแบบการให้บริการแบบ Pub/Sub โปรดอ้างถึง
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("ได้รับข้อความ: %s, ข้อมูล: %s", msg.UUID, string(msg.Payload))
// เราต้องยืนยันว่าเราได้รับและประมวลผลข้อความแล้ว
// มิฉะนั้นข้อความจะถูกส่งซ้ำอย่างต่อเนื่อง
msg.Ack()
}
}
ตัวอย่าง SQL
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// เราต้องยืนยันว่าข้อความได้รับและประมวลผลแล้ว
// มิฉะนั้น ข้อความจะถูกส่งซ้ำอย่างไม่มีที่สิ้นสุด
msg.Ack()
}
}
สร้างข้อความ
Watermill ไม่บังคับรูปแบบข้อความใด ๆ ฟังก์ชัน NewMessage
ต้องการให้ payload เป็นอาร์เรย์ไบต์ คุณสามารถใช้ strings, JSON, protobuf, Avro, gob หรือรูปแบบอื่น ๆ ที่สามารถถูกซีเรียไลซ์เป็น []byte
ได้
UUID ของข้อความเป็นส่วนเสริม แต่แนะนำให้ใช้เนื่องจากจะช่วยในการแก้ปัญหาข้อผิดพลาด
msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!")
ตีพิมพ์ข้อความ
เมะทอด ตีพิมพ์
ต้องการหัวข้อ และข้อความหรือมากกว่าหนึ่งข้อความที่จะตีพิมพ์
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
ตัวอย่าง Go Channel
โค้ดแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
ตัวอย่าง Kafka
โค้ดแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
ตัวอย่าง RabbitMQ (AMQP)
รหัสทั้งหมด: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
ตัวอย่าง SQL
รหัสทั้งหมด: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "สวัสดี, โลก!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
การใช้ Message Router
ผู้เผยแพร่และผู้สับที่ เป็นส่วนย่อยของ Watermill ในกรณีส่วนมากคุณอาจต้องการใช้อินเตอร์เฟซและคุณสมบัติระดับสูง เช่น ความสัมพันธ์ เมตริกส์ คิวที่เป็นพิษ การลองใหม่ และการจำกัดอัตรา
บางครั้งคุณอาจต้องการยอมรับข้อความหนึ่งครั้งก่อนที่จะดำเนินการประมวลผลเสร็จสิ้น ในกรณีอื่น คุณอาจต้องการยอมรับทันทีแล้วคิดการประมวลผลต่อไป บางครั้งคุณอาจต้องการดำเนินการบางอย่างตามข้อความที่เข้าและเผยแพร่ข้อความอื่นๆ เพื่อตอบโต้
เพื่อตอบสนองต่อความต้องการเหล่านี้ มีองค์ประกอบที่เรียกว่า Router.
ตัวอย่างการใช้งาน Message Router
การไหลของแอปพลิเคชันตัวอย่างดังนี้:
- สร้างข้อความบน
incoming_messages_topic
ทุกๆ วินาที - ตัวฟังก์ชัน
struct_handler
รับข้อมูลจากincoming_messages_topic
ภายในกระบวนการรับข้อมูล หลังจากได้รับข้อความ จะปริ้น UUID และสร้างข้อความใหม่บนoutgoing_messages_topic
- ตัวฟังก์ชัน
print_incoming_messages
รับข้อมูลจากincoming_messages_topic
และปริ้น UUID, ข้อมูล, และเมตาดาต้าของข้อความ - ตัวฟังก์ชัน
print_outgoing_messages
รับข้อมูลจากoutgoing_messages_topic
และปริ้น UUID, ข้อมูล, และเมตาดาต้าของข้อความ ความสัมพันธ์ควรเหมือนกันกับข้อความบนincoming_messages_topic
การกำหนดค่า Router
ก่อนอื่น ให้กำหนดค่า Router โดยการเพิ่มปลั๊กอินและมิดเดิลแวร์ แล้วตั้งค่าแฮนด์เลอร์ที่ Router จะใช้ แต่ละแฮนด์เลอร์จะประมวลผลข้อความโดยอิสระ
โค้ดซอร์สสมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// For this example, we use a simple logger implementation,
// you may want to provide your own `watermill.LoggerAdapter` implementation.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// เมื่อได้รับสัญญาณ SIGTERM SignalsHandler จะปิด Router อย่างราบรื่น
// คุณยังสามารถปิด Router โดยการเรียก `r.Close()`
router.AddPlugin(plugin.SignalsHandler)
// มิดเดิลแวร์ในระดับ Router จะถูกประมวลผลสำหรับทุกข้อความที่ส่งไปยัง Router
router.AddMiddleware(
// CorrelationID คัดลอก correlation ID จาก metadata ของข้อความที่เข้ามาไปยังข้อความที่สร้างขึ้น
middleware.CorrelationID,
// หาก handler ส่งคืน error ให้ลองส่งฟังก์ชัน handler อีกครั้ง
// เมื่อถึง MaxRetries แล้วข้อความจะถูก Nacked และ PubSub รับผิดชอบในการส่งข้อความไปใหม่
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer จัดการกับ panics ใน handler
// ในกรณีนี้ จะส่งมันเป็น errors ไปยัง Retry middleware
middleware.Recoverer,
)
// เพื่อความง่าย เราใช้ gochannel Pub/Sub ที่นี่
// คุณสามารถแทนที่ด้วยการประมวลผล Pub/Sub ใดก็ได้ และผลลัพธ์ก็จะเหมือนเดิม
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// สร้างข้อความขาเข้าบางอย่างขึ้นมาในพื้นหลัง
go publishMessages(pubSub)
// AddHandler ส่งคืน handler ที่สามารถใช้สำหรับเพิ่มมิดเดิลแวร์ในระดับ handler
// หรือหยุด handler
handler := router.AddHandler(
"struct_handler", // ชื่อ Handler ต้องไม่ซ้ำ
"incoming_messages_topic", // หัวข้อที่อ่านเหตุการณ์จาก
pubSub,
"outgoing_messages_topic", // หัวข้อที่เผยแพร่เหตุการณ์ไปยัง
pubSub,
structHandler{}.Handler,
)
// มิดเดิลแวร์ในระดับ Handler จะถูกทำงานเฉพาะสำหรับ handler เฉพาะ
// มิดเดิลแวร์ประเภทนี้สามารถเพิ่มได้ตามการประมวลผลสำหรับ Router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("กำลังประมวลผลมิดเดิลแวร์เฉพาะสำหรับ", message.UUID)
return h(message)
}
})
// เพื่อวัตถุประสงค์ในการ debug เราพิมพ์ข้อความที่ได้รับทั้งหมดบน "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// เพื่อเป็นการ debug เราพิมพ์เหตุการณ์ทั้งหมดที่ส่งไปยัง "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// ตอนนี้หลังจากลงทะเบียนแฮนด์เลอร์ทั้งหมดเรากำลังเรียกใช้ router
// Run บล็อกขณะที่ Router กำลังทำงาน
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
ข้อความขาเข้า
struct_handler
ทำการใช้งานข้อความจาก incoming_messages_topic
ดังนั้นเราจะจำลองการจราจรขาเข้าโดยการเรียกใช้ publishMessages()
ในพื้นหลัง โปรดทราบว่าเราได้เพิ่ม SetCorrelationID
middleware แล้ว ตัวเราเตอร์จะเพิ่ม correlation ID ในข้อความที่สร้างขึ้นทั้งหมด (จัดเก็บใน metadata)
โค้ดต้นฉบับ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("สวัสดี, โลก!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("กำลังส่งข้อความ %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Handlers
คุณอาจจะเห็นว่ามีสองประเภทของ ฟังก์ชันขั้นสูง:
- ฟังก์ชัน
func(msg *message.Message) ([]*message.Message, error)
- เมธอด
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
หาก handler ของคุณเป็นฟังก์ชันที่ไม่ได้ขึ้นอยู่กับความจำเป็นใด ๆ การใช้ตัวเลือกแรกก็เป็นเรื่องปกติ แต่ถ้า handler ของคุณต้องการความจำเป็นบางอย่าง (เช่น การจัดการข้อมูลฐานข้อมูล บันทึกข้อมูลเหตุการณ์ เป็นต้น) ตัวเลือกที่สองจะมีประโยชน์
โค้ดต้นฉบับ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> ได้รับข้อความ: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// ที่นี่เราสามารถเพิ่มความจำเป็นบางอย่างได้
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler ได้รับข้อความ", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("ข้อความที่ถูกสร้างโดย structHandler"))
return message.Messages{msg}, nil
}
เรียบร้อยแล้ว!
คุณสามารถรันตัวอย่างนี้โดย go run main.go
ได้