Установка
go get -u github.com/ThreeDotsLabs/watermill
Подготовка
Основная идея событийно-ориентированных приложений всегда одна и та же: прослушивание входящих сообщений и реагирование на них. Watermill поддерживает реализацию этого поведения для нескольких издателей и подписчиков.
Основная часть Watermill - это Message, которая так же важна, как http.Request
в пакете http
. Большинство функций Watermill используют эту структуру каким-либо образом.
Несмотря на сложные функции, предоставляемые библиотекой PubSub, для Watermill достаточно реализовать только два интерфейса для начала их использования: Publisher
и Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Подписка на сообщения
Давайте начнем с подписки. Subscribe
ожидает имя темы и возвращает канал для получения входящих сообщений. Конкретное значение темы зависит от реализации PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Для подробных примеров поддерживаемых PubSub обратитесь к следующему содержимому.
Пример Go Channel
Полный пример кода: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Полный пример кода: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// Нам нужно подтвердить, что мы получили и обработали сообщение, иначе оно будет отправлено несколько раз.
msg.Ack()
}
}
Пример Kafka
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Эквивалентно auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("получено сообщение: %s, данные: %s", msg.UUID, string(msg.Payload))
// Необходимо подтвердить получение и обработку сообщения, иначе оно будет повторно отправлено.
msg.Ack()
}
}
Пример RabbitMQ (AMQP)
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Эта конфигурация основана на следующем примере: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Она используется как простая очередь.
//
// Если вы хотите реализовать сервис в стиле Pub/Sub, пожалуйста, обратитесь к
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Получено сообщение: %s, данные: %s", msg.UUID, string(msg.Payload))
// Необходимо подтвердить получение и обработку сообщения, иначе оно будет повторно доставлено.
msg.Ack()
}
}
Пример SQL
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// Необходимо подтвердить получение и обработку сообщения, иначе сообщение будет повторно доставляться.
msg.Ack()
}
}
Создание сообщения
Watermill не накладывает ограничения на формат сообщения. NewMessage
ожидает, что полезная нагрузка будет срезом байтов. Вы можете использовать строки, JSON, protobuf, Avro, gob или любой другой формат, который можно сериализовать в []byte
.
Идентификатор UUID сообщения является необязательным, но рекомендуется, так как он помогает при отладке.
msg := message.NewMessage(watermill.NewUUID(), []byte("Привет, мир!"))
Опубликовать сообщение
Метод Publish
требует указания темы и одного или нескольких сообщений для публикации.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Пример Go канала
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Привет, мир!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Пример Kafka
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Привет, мир!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Пример RabbitMQ (AMQP)
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Привет, мир!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Пример SQL
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Привет, мир!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Использование Маршрутизации сообщениями
Издатели и подписчики являются более низкоуровневыми частями Watermill. В большинстве случаев вы обычно хотели бы использовать более высокоуровневые интерфейсы и функции, такие как корреляции, метрики, отравленные очереди, повторы и ограничение скорости.
Возможно, вы захотите подтвердить сообщение только после успешной обработки. В других случаях вы можете захотеть немедленно подтвердить его, а затем рассмотреть обработку. Иногда вам может понадобиться выполнить определенные действия на основе входящего сообщения и опубликовать другое сообщение в ответ.
Чтобы удовлетворить эти требования, существует компонент под названием Маршрутизатор.
Пример приложения Маршрутизатора сообщений
Поток примерного приложения следующий:
- Генерация сообщения на теме
incoming_messages_topic
каждую секунду. - Обработчик
struct_handler
обрабатываетincoming_messages_topic
. После получения сообщения он печатает UUID и генерирует новое сообщение на темеoutgoing_messages_topic
. - Обработчик
print_incoming_messages
прослушивает темуincoming_messages_topic
и печатает UUID, полезную нагрузку и метаданные сообщения. - Обработчик
print_outgoing_messages
прослушивает темуoutgoing_messages_topic
и печатает UUID, полезную нагрузку и метаданные сообщения. Идентификатор корреляции должен быть таким же, как у сообщения на темеincoming_messages_topic
.
Настройка маршрутизатора
Сначала настройте маршрутизатор, добавив плагины и промежуточное программное обеспечение. Затем установите обработчики, которые будет использовать маршрутизатор. Каждый обработчик будет независимо обрабатывать сообщения.
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Для этого примера мы используем простую реализацию регистратора,
// вам может потребоваться предоставить свою реализацию `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Получив сигнал SIGTERM, SignalsHandler корректно закроет маршрутизатор.
// Вы также можете закрыть маршрутизатор, вызвав `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Промежуточное программное обеспечение на уровне маршрутизатора будет выполняться для каждого сообщения, отправленного маршрутизатору
router.AddMiddleware(
// CorrelationID копирует идентификатор корреляции из метаданных входящего сообщения в сгенерированное сообщение
middleware.CorrelationID,
// Если обработчик возвращает ошибку, повторно выполните функцию обработчика.
// После достижения MaxRetries сообщение помечается как Nacked, и за отправку его отвечает PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer обрабатывает паники в обработчике.
// В этом случае он передает их как ошибки в middleware Retry.
middleware.Recoverer,
)
// Для простоты мы используем здесь реализацию gochannel Pub/Sub,
// вы можете заменить его на любую реализацию Pub/Sub, и результат будет таким же.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Создание входящих сообщений в фоновом режиме
go publishMessages(pubSub)
// AddHandler возвращает обработчик, который можно использовать для добавления промежуточного программного обеспечения на уровне обработчика
// или для остановки обработчика.
handler := router.AddHandler(
"struct_handler", // Название обработчика, должно быть уникальным
"incoming_messages_topic", // Тема, из которой считываются события
pubSub,
"outgoing_messages_topic", // Тема, в которую публикуются события
pubSub,
structHandler{}.Handler,
)
// Промежуточное программное обеспечение на уровне обработчика выполняется только для конкретного обработчика
// Этот вид промежуточного программного обеспечения можно добавлять таким же образом, как промежуточное программное обеспечение на уровне маршрутизатора
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Выполняется специфическое промежуточное программное обеспечение обработчика для", message.UUID)
return h(message)
}
})
// Для целей отладки мы выводим все сообщения, полученные на "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Для целей отладки мы выводим все события, отправленные в "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Теперь, когда все обработчики зарегистрированы, мы запускаем маршрутизатор.
// Метод Run блокируется при работе маршрутизатора.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Входящие сообщения
struct_handler
обрабатывает сообщения из темы incoming_messages_topic
, поэтому мы имитируем входящий трафик, вызывая publishMessages()
в фоновом режиме. Обратите внимание, что мы добавили промежуточное программное обеспечение SetCorrelationID
. Маршрутизатор будет добавлять идентификатор корреляции ко всем сгенерированным сообщениям (хранящимся в метаданных).
Исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("отправка сообщения %s, идентификатор корреляции: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Обработчики
Вы могли заметить, что существует два типа функций-обработчиков:
- Функция
func(msg *message.Message) ([]*message.Message, error)
- Метод
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Если ваш обработчик является функцией, которая не зависит от каких-либо зависимостей, то использование первого варианта подходит. Если ваш обработчик требует некоторых зависимостей (например, обработчики базы данных, регистраторы и т. д.), то полезен второй вариант.
Исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Получено сообщение: %s\n> %s\n> метаданные: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Здесь мы можем добавить некоторые зависимости
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler получено сообщение", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("сообщение, созданное structHandler"))
return message.Messages{msg}, nil
}
Готово!
Вы можете запустить этот пример с помощью go run main.go
.