Instalacja
go get -u github.com/ThreeDotsLabs/watermill
Przygotowanie
Podstawowa koncepcja aplikacji opartych na zdarzeniach zawsze jest taka sama: nasłuchiwanie przychodzących wiadomości i reagowanie na nie. Watermill obsługuje implementację tego zachowania dla wielu wydawców i subskrybentów.
Rdzeń Watermill stanowi Wiadomość, która jest równie ważna jak http.Request
w paczce http
. Większość funkcji Watermill wykorzystuje tę strukturę w różny sposób.
Pomimo złożonych funkcji dostarczanych przez bibliotekę PubSub, dla Watermill wystarczy zaimplementować tylko dwie interfejsy, aby zacząć ich używać: Publisher
i Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Subskrybowanie Wiadomości
Zacznijmy od subskrybowania. Metoda Subscribe
oczekuje nazwy tematu i zwraca kanał do odbierania przychodzących wiadomości. Konkretny sens tematu zależy od implementacji PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("odebrano wiadomość: %s, zawartość: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Dla szczegółowych przykładów obsługiwanych PubSub, proszę odwołać się do następującej treści.
Przykład Kanału Go
Pełny przykładowy kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Pełny przykładowy kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("odebrano wiadomość: %s, zawartość: %s\n", msg.UUID, string(msg.Payload))
// Musimy potwierdzić, że otrzymaliśmy i przetworzyliśmy wiadomość, w przeciwnym razie zostanie ona wielokrotnie wysłana ponownie.
msg.Ack()
}
}
Przykład Kafka
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Odpowiada auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("odebrano wiadomość: %s, dane: %s", msg.UUID, string(msg.Payload))
// Musimy potwierdzić otrzymanie i przetworzenie wiadomości,
// w przeciwnym razie wiadomość będzie powtarzalnie ponownie wysyłana.
msg.Ack()
}
}
Przykład RabbitMQ (AMQP)
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Ta konfiguracja opiera się na następującym przykładzie: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Jest używana jako prosta kolejka.
//
// Jeśli chcesz zaimplementować usługę w stylu Pub/Sub, proszę odnieść się do
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Odebrano wiadomość: %s, dane: %s", msg.UUID, string(msg.Payload))
// Musimy potwierdzić, że wiadomość została odebrana i przetworzona,
// w przeciwnym razie wiadomość będzie ponownie dostarczana.
msg.Ack()
}
}
Przykład SQL
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// Musimy potwierdzić, że wiadomość została odebrana i przetworzona,
// w przeciwnym razie wiadomość będzie powtarzalnie ponownie dostarczana.
msg.Ack()
}
}
Utwórz wiadomość
Watermill nie narzuca żadnego formatu wiadomości. Funkcja NewMessage
oczekuje, że ładunek będzie cięciem bajtowym. Możesz użyć ciągów znaków, JSON, protobuf, Avro, gob lub dowolnego innego formatu, który można zserializować do []byte
.
UUID wiadomości jest opcjonalne, ale zaleca się jego użycie, ponieważ pomaga w debugowaniu.
msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
Opublikuj wiadomość
Metoda Publish
wymaga tematu i co najmniej jednej wiadomości do opublikowania.
err := publisher.Publish("przykladowy.temat", msg)
if err != nil {
panic(err)
}
Przykład kanału Go
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
opublikujWiadomosci(pubSub)
}
func opublikujWiadomosci(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
if err := publisher.Publish("przykladowy.temat", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Przykład Kafka
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
opublikujWiadomosci(publisher)
}
func opublikujWiadomosci(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
if err := publisher.Publish("przykladowy.temat", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Przykład RabbitMQ (AMQP)
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Przykład SQL
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Witaj, świecie!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Korzystanie z Router’a wiadomości
Publikatory i subskrybenci stanowią część niższego poziomu Watermilla. W większości przypadków zazwyczaj chcesz korzystać z interfejsów i funkcji wyższego poziomu, takich jak korelacje, metryki, kolejki trujące, powtórzenia i ograniczanie przepustowości.
Być może chcesz potwierdzać wiadomość dopiero po jej pomyślnym przetworzeniu. W innych przypadkach możesz chcieć ją natychmiast potwierdzić, a następnie rozważyć przetworzenie. Czasami chcesz wykonać określone czynności na podstawie przychodzącej wiadomości i opublikować w odpowiedzi inną wiadomość.
Aby sprostać tym wymaganiom, istnieje komponent o nazwie Router.
Przykładowa aplikacja Router’a wiadomości
Przebieg przykładowej aplikacji jest następujący:
- Co sekundę generowana jest wiadomość na temat
incoming_messages_topic
. - Nasłuchiwacz
struct_handler
obsługuje tematincoming_messages_topic
. Po otrzymaniu wiadomości drukuje UUID i generuje nową wiadomość na tematoutgoing_messages_topic
. - Obsługa
print_incoming_messages
nasłuchuje na tematincoming_messages_topic
i drukuje UUID, ładunek i metadane wiadomości. - Obsługa
print_outgoing_messages
nasłuchuje na tematoutgoing_messages_topic
i drukuje UUID, ładunek i metadane wiadomości. Identyfikator korelacyjny powinien być taki sam jak wiadomość na tematincoming_messages_topic
.
Konfiguracja routera
Najpierw skonfiguruj router poprzez dodanie wtyczek i oprogramowania pośredniczącego (middleware). Następnie ustaw obsługujące programy obsługi, które router będzie używał. Każdy program obsługi będzie niezależnie przetwarzał wiadomości.
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Dla tego przykładu używamy prostego wdrożenia rejestrowania,
// możesz chcieć dostarczyć własną implementację `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Przyjmując sygnał SIGTERM, SignalsHandler wyłączy Router w sposób kontrolowany.
// Możesz także wyłączyć router, wywołując `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Oprogramowanie pośredniczące na poziomie routera zostanie wykonane dla każdej wiadomości wysłanej do routera
router.AddMiddleware(
// CorrelationID kopiuje identyfikator korelacji z metadanych przychodzącej wiadomości do wygenerowanej wiadomości
middleware.CorrelationID,
// Jeśli program obsługi zwraca błąd, ponawia funkcję programu obsługi.
// Po osiągnięciu MaxRetries, wiadomość zostanie odrzucona (Nacked), a PubSub będzie odpowiedzialny za jej ponowne przesłanie.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer obsługuje paniki w programie obsługi.
// W tym przypadku przekazuje je jako błędy do oprogramowania pośredniczącego Retry.
middleware.Recoverer,
)
// Dla uproszczenia tutaj używamy publikacji/subskrypcji gochannel,
// możesz je zastąpić dowolną implementacją Pub/Sub, a efekt będzie taki sam.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Generowanie niektórych przychodzących wiadomości w tle
go publishMessages(pubSub)
// AddHandler zwraca program obsługi, który może być użyty do dodawania oprogramowania pośredniczącego na poziomie programu obsługi
// lub do zatrzymania programu obsługi.
handler := router.AddHandler(
"struct_handler", // Nazwa programu obsługi, musi być unikalna
"incoming_messages_topic", // Temat, z którego czytane są zdarzenia
pubSub,
"outgoing_messages_topic", // Temat, do którego zdarzenia są publikowane
pubSub,
structHandler{}.Handler,
)
// Oprogramowanie pośredniczące na poziomie programu obsługi jest wykonywane tylko dla określonego programu obsługi
// Ten rodzaj oprogramowania pośredniczącego może być dodany w ten sam sposób, co na poziomie routera
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Wykonywanie oprogramowania pośredniczącego na poziomie programu obsługi dla", message.UUID)
return h(message)
}
})
// W celach debugowania drukujemy wszystkie otrzymane wiadomości na "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// W celach debugowania drukujemy wszystkie wysłane zdarzenia do "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Teraz, gdy wszystkie programy obsługi są zarejestrowane, uruchamiamy router.
// Metoda Run blokuje się podczas działania routera.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Przychodzące wiadomości
struct_handler
konsumuje wiadomości z tematu incoming_messages_topic
, więc symulujemy przychodzący ruch, wywołując publishMessages()
w tle. Zauważ, że dodaliśmy middleware SetCorrelationID
. Router doda identyfikator korelacji do wszystkich wygenerowanych wiadomości (przechowywanych w metadanych).
Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("wysyłanie wiadomości %s, identyfikator korelacji: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Obsługiwacze
Mogłeś zauważyć, że istnieją dwa rodzaje funkcji obsługujących:
- Funkcja
func(msg *message.Message) ([]*message.Message, error)
- Metoda
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Jeśli twój obsługiwacz to funkcja, która nie zależy od żadnych zależności, to użycie pierwszej opcji jest odpowiednie. Gdy twój obsługiwacz wymaga pewnych zależności (takich jak uchwyty do bazy danych, loggery, itp.), przydatna jest druga opcja.
Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Otrzymano wiadomość: %s\n> %s\n> metadane: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Tutaj możemy dodać pewne zależności
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler otrzymał wiadomość", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("wiadomość wygenerowana przez structHandler"))
return message.Messages{msg}, nil
}
Gotowe!
Możesz uruchomić ten przykład za pomocą polecenia go run main.go
.