Cài đặt
go get -u github.com/ThreeDotsLabs/watermill
Chuẩn bị
Ý tưởng cơ bản đằng sau ứng dụng dựa trên sự kiện luôn giống nhau: nghe các tin nhắn đến và phản ứng với chúng. Watermill hỗ trợ việc thực hiện hành vi này cho nhiều nhà xuất bản và người đăng ký.
Phần cốt lõi của Watermill là Message, mà quan trọng như http.Request
trong gói http
. Hầu hết các tính năng của Watermill đều sử dụng cấu trúc này một cách nào đó.
Mặc dù có tính năng phức tạp được cung cấp bởi thư viện PubSub, nhưng đối với Watermill, chỉ cần triển khai hai giao diện để bắt đầu sử dụng chúng: Publisher
và Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Theo dõi Tin nhắn
Hãy bắt đầu bằng việc đăng ký. Subscribe
mong đợi một tên chủ đề và trả về một kênh để nhận các tin nhắn đến. Ý nghĩa cụ thể của topic phụ thuộc vào việc triển khai của PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Để biết các ví dụ chi tiết của PubSub được hỗ trợ, vui lòng tham khảo nội dung sau.
Ví dụ Go Channel
Mã ví dụ hoàn chỉnh: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Mã ví dụ hoàn chỉnh: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// Chúng ta cần xác nhận rằng chúng ta đã nhận và xử lý tin nhắn,
// nếu không, nó sẽ được gửi lại nhiều lần.
msg.Ack()
}
}
Ví dụ Kafka
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Tương đương với auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// Chúng ta cần phải xác nhận rằng chúng ta đã nhận và xử lý thông điệp,
// nếu không, thông điệp sẽ được gửi lại lặp đi lặp lại.
msg.Ack()
}
}
Ví dụ RabbitMQ (AMQP)
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Cấu hình này dựa trên ví dụ sau: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Nó được sử dụng như một hàng đợi đơn giản.
//
// Nếu bạn muốn triển khai dịch vụ kiểu Pub/Sub, vui lòng tham khảo
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// Chúng ta cần phải xác nhận rằng thông điệp đã được nhận và xử lý,
// nếu không, thông điệp sẽ được gửi lại lặp đi lặp lại.
msg.Ack()
}
}
Ví dụ SQL
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Nhận thông điệp: %s, tải: %s", msg.UUID, string(msg.Payload))
// Chúng ta cần phải xác nhận rằng thông điệp đã được nhận và xử lý,
// nếu không, thông điệp sẽ được gửi lại một cách liên tục.
msg.Ack()
}
}
Tạo thông điệp
Watermill không bắt buộc bất kỳ định dạng thông điệp nào. NewMessage
mong đợi tải được là một mảng byte. Bạn có thể sử dụng chuỗi, JSON, protobuf, Avro, gob hoặc bất kỳ định dạng khác có thể được tuần tự hóa thành []byte
.
UUID thông điệp là tùy chọn, nhưng được khuyến nghị vì nó hỗ trợ việc gỡ lỗi.
msg := message.NewMessage(watermill.NewUUID(), []byte("Xin chào, thế giới!"))
Đăng bài thông điệp
Phương thức Publish
yêu cầu một chủ đề và một hoặc nhiều thông điệp để xuất bản.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Ví dụ Kênh Go
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Xin chào, thế giới!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Ví dụ Kafka
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Xin chào, thế giới!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Ví dụ RabbitMQ (AMQP)
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Xin chào, thế giới!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Ví dụ SQL
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Xin chào, thế giới!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Sử dụng Message Router
Publishers và Subscribers là các phần cấp thấp của Watermill. Trong hầu hết các trường hợp, bạn thông thường muốn sử dụng các giao diện và tính năng cấp cao hơn như correlations, metrics, poison queues, retries và rate limiting.
Bạn có thể chỉ muốn xác nhận tin nhắn sau khi nó đã được xử lý thành công. Trong các trường hợp khác, bạn có thể muốn xác nhận ngay lập tức và sau đó xem xét xử lý nó. Đôi khi, bạn có thể muốn thực hiện một số hành động dựa trên tin nhắn đầu vào và đăng thông báo khác như phản ứng.
Để đáp ứng những yêu cầu này, có một thành phần gọi là Router.
Ví dụ Ứng dụng của Message Router
Luồng của ứng dụng ví dụ như sau:
- Tạo một tin nhắn trên
incoming_messages_topic
mỗi giây. - Người nghe
struct_handler
xử lýincoming_messages_topic
. Khi nhận được một tin nhắn, nó in ra UUID và tạo một tin nhắn mới trênoutgoing_messages_topic
. - Người nghe
print_incoming_messages
lắng nghe trênincoming_messages_topic
và in ra UUID, dữ liệu và siêu dữ liệu của tin nhắn. - Người nghe
print_outgoing_messages
lắng nghe trênoutgoing_messages_topic
và in ra UUID, dữ liệu và siêu dữ liệu của tin nhắn. Correlation ID nên giống với tin nhắn trênincoming_messages_topic
.
Cấu hình Router
Đầu tiên, cấu hình router bằng cách thêm plugin và middleware. Sau đó, thiết lập các trình xử lý mà router sẽ sử dụng. Mỗi trình xử lý sẽ xử lý các tin nhắn một cách độc lập.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Trong ví dụ này, chúng ta sử dụng một triển khai logger đơn giản,
// bạn có thể muốn cung cấp triển khai `watermill.LoggerAdapter` riêng của bạn.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Khi nhận được tín hiệu SIGTERM, SignalsHandler sẽ đóng Router một cách dễ dàng.
// Bạn cũng có thể đóng router bằng cách gọi `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Middleware cấp Router sẽ được thực thi cho mỗi tin nhắn gửi đến router
router.AddMiddleware(
// CorrelationID sao chép ID tương quan từ siêu dữ liệu của tin nhắn đến tin nhắn được tạo ra
middleware.CorrelationID,
// Nếu trình xử lý trả về lỗi, thử lại hàm xử lý.
// Sau khi đạt tới MaxRetries, tin nhắn bị Nacked, và PubSub có trách nhiệm gửi lại nó.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer xử lý các sự cố trong trình xử lý.
// Trong trường hợp này, nó truyền chúng như lỗi cho middleware Retry.
middleware.Recoverer,
)
// Đơn giản, chúng ta sử dụng gochannel Pub/Sub ở đây,
// bạn có thể thay thế nó bằng bất kỳ triển khai Pub/Sub nào khác và hiệu quả sẽ là như nhau.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Tạo ra một số tin nhắn đến trong nền
go publishMessages(pubSub)
// AddHandler trả về một trình xử lý có thể được sử dụng để thêm middleware cấp handler
// hoặc để dừng trình xử lý.
handler := router.AddHandler(
"struct_handler", // Tên trình xử lý, phải là duy nhất
"incoming_messages_topic", // Chủ đề từ đó các sự kiện được đọc
pubSub,
"outgoing_messages_topic", // Chủ đề từ đó các sự kiện được xuất bản
pubSub,
structHandler{}.Handler,
)
// Middleware cấp handler chỉ được thực thi cho một trình xử lý cụ thể
// Loại middleware này có thể được thêm vào cùng cách như middleware cấp router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Đang thực thi middleware cụ thể cho trình xử lý", message.UUID)
return h(message)
}
})
// Vì mục đích gỡ lỗi, chúng ta in tất cả các tin nhắn nhận được trên "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Vì mục đích gỡ lỗi, chúng ta in tất cả các sự kiện được gửi đến "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Bây giờ khi tất cả các trình xử lý được đăng ký, chúng ta đang chạy router.
// Hàm Run chặn khi router đang chạy.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Tin Nhắn Đầu Vào
struct_handler
tiêu thụ các tin nhắn từ incoming_messages_topic
, vì vậy chúng ta mô phỏng luồng tin nhắn đầu vào bằng cách gọi publishMessages()
ở nền. Lưu ý rằng chúng ta đã thêm middleware SetCorrelationID
. Bộ định tuyến sẽ thêm một correlation ID vào tất cả các tin nhắn được tạo ra (được lưu trữ trong metadata).
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Xin chào, thế giới!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("đang gửi tin nhắn %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Bộ Xử Lý
Bạn có thể nhận thấy rằng có hai loại hàm xử lý:
- Hàm
func(msg *message.Message) ([]*message.Message, error)
- Phương thức
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Nếu bộ xử lý của bạn là một hàm không phụ thuộc vào bất kỳ phụ thuộc nào, việc sử dụng lựa chọn đầu tiên là tốt. Khi bộ xử lý của bạn yêu cầu một số phụ thuộc (chẳng hạn như xử lý cơ sở dữ liệu, ghi log, v.v.), lựa chọn thứ hai là hữu ích.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Nhận tin nhắn: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Ở đây chúng ta có thể thêm một số phụ thuộc
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler nhận tin nhắn", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("tin nhắn được sản xuất bởi structHandler"))
return message.Messages{msg}, nil
}
Đã Xong!
Bạn có thể chạy ví dụ này bằng cách go run main.go
.