Instalacja

go get -u github.com/ThreeDotsLabs/watermill

Przygotowanie

Podstawowa koncepcja aplikacji opartych na zdarzeniach zawsze jest taka sama: nasłuchiwanie przychodzących wiadomości i reagowanie na nie. Watermill obsługuje implementację tego zachowania dla wielu wydawców i subskrybentów.

Rdzeń Watermill stanowi Wiadomość, która jest równie ważna jak http.Request w paczce http. Większość funkcji Watermill wykorzystuje tę strukturę w różny sposób.

Pomimo złożonych funkcji dostarczanych przez bibliotekę PubSub, dla Watermill wystarczy zaimplementować tylko dwie interfejsy, aby zacząć ich używać: Publisher i Subscriber.

type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}

type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Subskrybowanie Wiadomości

Zacznijmy od subskrybowania. Metoda Subscribe oczekuje nazwy tematu i zwraca kanał do odbierania przychodzących wiadomości. Konkretny sens tematu zależy od implementacji PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
    panic(err)
}

for msg := range messages {
    fmt.Printf("odebrano wiadomość: %s, zawartość: %s\n", msg.UUID, string(msg.Payload))
    msg.Ack()
}

Dla szczegółowych przykładów obsługiwanych PubSub, proszę odwołać się do następującej treści.

Przykład Kanału Go

Pełny przykładowy kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
    pubSub := gochannel.NewGoChannel(
        gochannel.Config{},
        watermill.NewStdLogger(false, false),
    )

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
    panic(err)
}

go process(messages)
// ...

Pełny przykładowy kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        fmt.Printf("odebrano wiadomość: %s, zawartość: %s\n", msg.UUID, string(msg.Payload))

    // Musimy potwierdzić, że otrzymaliśmy i przetworzyliśmy wiadomość, w przeciwnym razie zostanie ona wielokrotnie wysłana ponownie.
        msg.Ack()
    }
}

Przykład Kafka

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/Shopify/sarama"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // Odpowiada auto.offset.reset: earliest
    saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:               []string{"kafka:9092"},
            Unmarshaler:           kafka.DefaultMarshaler{},
            OverwriteSaramaConfig: saramaSubscriberConfig,
            ConsumerGroup:         "test_consumer_group",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("odebrano wiadomość: %s, dane: %s", msg.UUID, string(msg.Payload))

        // Musimy potwierdzić otrzymanie i przetworzenie wiadomości,
        // w przeciwnym razie wiadomość będzie powtarzalnie ponownie wysyłana.
        msg.Ack()
    }
}

Przykład RabbitMQ (AMQP)

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
    "github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
    amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

    subscriber, err := amqp.NewSubscriber(
        // Ta konfiguracja opiera się na następującym przykładzie: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
        // Jest używana jako prosta kolejka.
        //
        // Jeśli chcesz zaimplementować usługę w stylu Pub/Sub, proszę odnieść się do
        // https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
        amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Odebrano wiadomość: %s, dane: %s", msg.UUID, string(msg.Payload))

        // Musimy potwierdzić, że wiadomość została odebrana i przetworzona,
        // w przeciwnym razie wiadomość będzie ponownie dostarczana.
        msg.Ack()
    }
}

Przykład SQL

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
    "context"
    stdSQL "database/sql"
    "log"
    "time"

    driver "github.com/go-sql-driver/mysql"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    db := createDB()
    logger := watermill.NewStdLogger(false, false)

    subscriber, err := sql.NewSubscriber(
        db,
        sql.SubscriberConfig{
            SchemaAdapter:    sql.DefaultMySQLSchema{},
            OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
            InitializeSchema: true,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example_topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // Musimy potwierdzić, że wiadomość została odebrana i przetworzona,
        // w przeciwnym razie wiadomość będzie powtarzalnie ponownie dostarczana.
        msg.Ack()
    }
}

Utwórz wiadomość

Watermill nie narzuca żadnego formatu wiadomości. Funkcja NewMessage oczekuje, że ładunek będzie cięciem bajtowym. Możesz użyć ciągów znaków, JSON, protobuf, Avro, gob lub dowolnego innego formatu, który można zserializować do []byte.

UUID wiadomości jest opcjonalne, ale zaleca się jego użycie, ponieważ pomaga w debugowaniu.

msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))

Opublikuj wiadomość

Metoda Publish wymaga tematu i co najmniej jednej wiadomości do opublikowania.

err := publisher.Publish("przykladowy.temat", msg)
if err != nil {
    panic(err)
}

Przykład kanału Go

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
    go process(messages)

    opublikujWiadomosci(pubSub)
}

func opublikujWiadomosci(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))

        if err := publisher.Publish("przykladowy.temat", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Przykład Kafka

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
    go process(messages)

    publisher, err := kafka.NewPublisher(
        kafka.PublisherConfig{
            Brokers:   []string{"kafka:9092"},
            Marshaler: kafka.DefaultMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    opublikujWiadomosci(publisher)
}

func opublikujWiadomosci(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))

        if err := publisher.Publish("przykladowy.temat", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Przykład RabbitMQ (AMQP)

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
    go process(messages)

    publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Przykład SQL

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
    go process(messages)

    publisher, err := sql.NewPublisher(
        db,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultMySQLSchema{},
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func createDB() *stdSQL.DB {
    conf := driver.NewConfig()
    conf.Net = "tcp"
    conf.User = "root"
    conf.Addr = "mysql"
    conf.DBName = "watermill"

    db, err := stdSQL.Open("mysql", conf.FormatDSN())
    if err != nil {
        panic(err)
    }

    err = db.Ping()
    if err != nil {
        panic(err)
    }

    return db
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Witaj, świecie!"}`))

        if err := publisher.Publish("example_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Korzystanie z Router’a wiadomości

Publikatory i subskrybenci stanowią część niższego poziomu Watermilla. W większości przypadków zazwyczaj chcesz korzystać z interfejsów i funkcji wyższego poziomu, takich jak korelacje, metryki, kolejki trujące, powtórzenia i ograniczanie przepustowości.

Być może chcesz potwierdzać wiadomość dopiero po jej pomyślnym przetworzeniu. W innych przypadkach możesz chcieć ją natychmiast potwierdzić, a następnie rozważyć przetworzenie. Czasami chcesz wykonać określone czynności na podstawie przychodzącej wiadomości i opublikować w odpowiedzi inną wiadomość.

Aby sprostać tym wymaganiom, istnieje komponent o nazwie Router.

Przykładowa aplikacja Router’a wiadomości

Przebieg przykładowej aplikacji jest następujący:

  1. Co sekundę generowana jest wiadomość na temat incoming_messages_topic.
  2. Nasłuchiwacz struct_handler obsługuje temat incoming_messages_topic. Po otrzymaniu wiadomości drukuje UUID i generuje nową wiadomość na temat outgoing_messages_topic.
  3. Obsługa print_incoming_messages nasłuchuje na temat incoming_messages_topic i drukuje UUID, ładunek i metadane wiadomości.
  4. Obsługa print_outgoing_messages nasłuchuje na temat outgoing_messages_topic i drukuje UUID, ładunek i metadane wiadomości. Identyfikator korelacyjny powinien być taki sam jak wiadomość na temat incoming_messages_topic.

Konfiguracja routera

Najpierw skonfiguruj router poprzez dodanie wtyczek i oprogramowania pośredniczącego (middleware). Następnie ustaw obsługujące programy obsługi, które router będzie używał. Każdy program obsługi będzie niezależnie przetwarzał wiadomości.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
    // Dla tego przykładu używamy prostego wdrożenia rejestrowania, 
    // możesz chcieć dostarczyć własną implementację `watermill.LoggerAdapter`.
    logger = watermill.NewStdLogger(false, false)
)

func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // Przyjmując sygnał SIGTERM, SignalsHandler wyłączy Router w sposób kontrolowany.
    // Możesz także wyłączyć router, wywołując `r.Close()`.
    router.AddPlugin(plugin.SignalsHandler)

    // Oprogramowanie pośredniczące na poziomie routera zostanie wykonane dla każdej wiadomości wysłanej do routera
    router.AddMiddleware(
        // CorrelationID kopiuje identyfikator korelacji z metadanych przychodzącej wiadomości do wygenerowanej wiadomości
        middleware.CorrelationID,

        // Jeśli program obsługi zwraca błąd, ponawia funkcję programu obsługi.
        // Po osiągnięciu MaxRetries, wiadomość zostanie odrzucona (Nacked), a PubSub będzie odpowiedzialny za jej ponowne przesłanie.
        middleware.Retry{
            MaxRetries:      3,
            InitialInterval: time.Millisecond * 100,
            Logger:          logger,
        }.Middleware,

        // Recoverer obsługuje paniki w programie obsługi.
        // W tym przypadku przekazuje je jako błędy do oprogramowania pośredniczącego Retry.
        middleware.Recoverer,
    )

    // Dla uproszczenia tutaj używamy publikacji/subskrypcji gochannel,
    // możesz je zastąpić dowolną implementacją Pub/Sub, a efekt będzie taki sam.
    pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // Generowanie niektórych przychodzących wiadomości w tle
    go publishMessages(pubSub)

    // AddHandler zwraca program obsługi, który może być użyty do dodawania oprogramowania pośredniczącego na poziomie programu obsługi
    // lub do zatrzymania programu obsługi.
    handler := router.AddHandler(
        "struct_handler",          // Nazwa programu obsługi, musi być unikalna
        "incoming_messages_topic", // Temat, z którego czytane są zdarzenia
        pubSub,
        "outgoing_messages_topic", // Temat, do którego zdarzenia są publikowane
        pubSub,
        structHandler{}.Handler,
    )

    // Oprogramowanie pośredniczące na poziomie programu obsługi jest wykonywane tylko dla określonego programu obsługi
    // Ten rodzaj oprogramowania pośredniczącego może być dodany w ten sam sposób, co na poziomie routera
    handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
        return func(message *message.Message) ([]*message.Message, error) {
            log.Println("Wykonywanie oprogramowania pośredniczącego na poziomie programu obsługi dla", message.UUID)

            return h(message)
        }
    })

    // W celach debugowania drukujemy wszystkie otrzymane wiadomości na "incoming_messages_topic"
    router.AddNoPublisherHandler(
        "print_incoming_messages",
        "incoming_messages_topic",
        pubSub,
        printMessages,
    )

    // W celach debugowania drukujemy wszystkie wysłane zdarzenia do "outgoing_messages_topic"
    router.AddNoPublisherHandler(
        "print_outgoing_messages",
        "outgoing_messages_topic",
        pubSub,
        printMessages,
    )

    // Teraz, gdy wszystkie programy obsługi są zarejestrowane, uruchamiamy router.
    // Metoda Run blokuje się podczas działania routera.
    ctx := context.Background()
    if err := router.Run(ctx); err != nil {
        panic(err)
    }
}
// ...

Przychodzące wiadomości

struct_handler konsumuje wiadomości z tematu incoming_messages_topic, więc symulujemy przychodzący ruch, wywołując publishMessages() w tle. Zauważ, że dodaliśmy middleware SetCorrelationID. Router doda identyfikator korelacji do wszystkich wygenerowanych wiadomości (przechowywanych w metadanych).

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Witaj, świecie!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("wysyłanie wiadomości %s, identyfikator korelacji: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Obsługiwacze

Mogłeś zauważyć, że istnieją dwa rodzaje funkcji obsługujących:

  1. Funkcja func(msg *message.Message) ([]*message.Message, error)
  2. Metoda func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Jeśli twój obsługiwacz to funkcja, która nie zależy od żadnych zależności, to użycie pierwszej opcji jest odpowiednie. Gdy twój obsługiwacz wymaga pewnych zależności (takich jak uchwyty do bazy danych, loggery, itp.), przydatna jest druga opcja.

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Otrzymano wiadomość: %s\n> %s\n> metadane: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Tutaj możemy dodać pewne zależności
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler otrzymał wiadomość", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("wiadomość wygenerowana przez structHandler"))
    return message.Messages{msg}, nil
}

Gotowe!

Możesz uruchomić ten przykład za pomocą polecenia go run main.go.