Kurulum

go get -u github.com/ThreeDotsLabs/watermill

Hazırlık

Olay temelli uygulamaların temel fikri her zaman aynıdır: gelen mesajları dinlemek ve onlara tepki vermek. Watermill, bu davranışın birden fazla yayıncı ve aboneler için uygulanmasını destekler.

Watermill'in temel parçası Message'dır, bu, http paketindeki http.Request kadar önemlidir. Watermill özelliklerinin çoğu, bu yapıyı bir şekilde kullanır.

PubSub kitabının sağladığı karmaşık özelliklere rağmen, Watermill için bunları kullanmaya başlamanın sadece iki arayüzü uygulamak zorunluluğu vardır: Publisher ve Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Mesajlara Abone Olma

Abone olmaya başlayalım. Subscribe, bir konu adı bekler ve gelen mesajları almak için bir kanal döndürür. Konu'nun belirli anlamı, PubSub uygulamasına bağlı olarak değişir.

messages, err := subscriber.Subscribe(ctx, "örnek.konu")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("mesaj alındı: %s, yük: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Desteklenen PubSub örnekleri için detaylı örnekler için aşağıdaki içeriğe başvurun.

Go Kanalı Örneği

Tam örnek kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "örnek.konu")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Tam örnek kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("mesaj alındı: %s, yük: %s\n", msg.UUID, string(msg.Payload))

	// Mesajı aldığımızı ve işlediğimizi onaylamamız gerekir,
	// aksi takdirde birden fazla kez yeniden gönderilecektir.
	msg.Ack()
	}
}

Kafka Örneği

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// auto.offset.reset: 'earliest' ile aynıdır
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("mesaj alındı: %s, içerik: %s", msg.UUID, string(msg.Payload))

		// Mesajın alındığını ve işlendiğini doğrulamamız gerekiyor,
		// aksi halde mesaj sürekli olarak tekrar gönderilecektir.
		msg.Ack()
	}
}

RabbitMQ (AMQP) Örneği

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Bu yapılandırma aşağıdaki örneğe dayalıdır: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Basit bir kuyruk olarak kullanılır.
		//
		// Bir Yayın/Abone tarzı hizmet uygulamak istiyorsanız, lütfen şuraya bakın
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Mesaj alındı: %s, içerik: %s", msg.UUID, string(msg.Payload))

		// Mesajın alındığını ve işlendiğini doğrulamamız gerekiyor,
		// aksi halde mesaj sürekli olarak yeniden iletilir.
		msg.Ack()
	}
}

SQL Örneği

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB() 	// DB oluşturuluyor
	logger := watermill.NewStdLogger(false, false) 	// Logger oluşturuluyor

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{}, 	// Şema uyumlu MySql varsayılan
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{}, 	// Offset adaptörü uyumlu MySql varsayılanı
			InitializeSchema: true, 	// Şema başlatılıyor
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// Mesajın alındığını ve işlendiğini onaylamamız gerekiyor,
		// aksi halde mesaj tekrar tekrar iletilecektir.
		msg.Ack()
	}
}

Mesaj Oluşturma

Watermill herhangi bir mesaj formatını zorunlu kılmaz. NewMessage, özelliğin bir byte dizisi olmasını bekler. Dizeler, JSON, protobuf, Avro, gob veya []byte'e seri hale getirilebilen başka bir formatta kullanılabilir.

Mesaj UUID'si isteğe bağlıdır, ancak hata ayıklamada yardımcı olması önerilir.

msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))

Mesaj Yayınlama

Publish yöntemi, bir konu ve yayınlamak için bir veya daha fazla mesaj gerektirir.

err := publisher.Publish("örnek.konu", msg)
if err != nil {
    panic(err)
}

Go Kanalı Örneği

Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	mesajlarıYayınla(pubSub)
}

func mesajlarıYayınla(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))

		if err := publisher.Publish("örnek.konu", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Kafka Örneği

Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	mesajlarıYayınla(publisher)
}

func mesajlarıYayınla(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))

		if err := publisher.Publish("örnek.konu", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) Örneği

Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))

		if err := publisher.Publish("örnek.konu", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

SQL Örneği

Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Merhaba, dünya!"}`))

		if err := publisher.Publish("örnek_konu", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Mesaj Yönlendirici Kullanımı

Yayıncılar ve Aboneler, Watermill'in alt düzey parçalarıdır. Çoğu durumda, genellikle ilişkilendirme, metrikler, zehirli kuyruklar, yeniden deneme ve hız sınırlama gibi daha üst düzey arabirimleri ve özellikleri kullanmak isteyebilirsiniz.

Mesajın başarılı bir şekilde işlendiğinden emin olmadan mesajı yalnızca o zaman kabul etmek isteyebilirsiniz. Diğer durumlarda, hemen kabul etmek ve ardından düşünmek isteyebilirsiniz. Bazen gelen mesaja göre belirli eylemler gerçekleştirmek ve yanıt olarak başka bir mesaj yayınlamak isteyebilirsiniz.

Bu gereksinimleri karşılamak için Yönlendirici adında bir bileşen bulunmaktadır.

Mesaj Yönlendirici Örnek Uygulaması

Örnek uygulamanın akışı şöyledir:

  1. Her saniye gelen_mesajlar_konu üzerinde bir mesaj oluşturun.
  2. struct_handler dinleyici, gelen_mesajlar_konu'nu işler. Bir mesaj alır almaz UUID'yi yazdırır ve giden_mesajlar_konu üzerinde yeni bir mesaj oluşturur.
  3. print_gelen_mesajlar handler gelen_mesajlar_konu'nu dinler ve mesajın UUID, veri yükü ve meta verilerini yazdırır.
  4. print_giden_mesajlar handler giden_mesajlar_konu'nu dinler ve mesajın UUID, veri yükü ve meta verilerini yazdırır. İlişkilendirme ID'si, gelen_mesajlar_konu'ndaki mesajla aynı olmalıdır.

Yönlendirici Konfigürasyonu

Öncelikle, yönelticiyi eklentileri ve ara yazılımları ekleyerek yapılandırın. Ardından, yönelticinin kullanacağı işleyicileri ayarlayın. Her işleyici mesajları bağımsız olarak işleyecektir.

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Bu örnekte, basit bir günlükleyici uygulaması kullanıyoruz,
	// kendi `watermill.LoggerAdapter` uygulamanızı sağlamak isteyebilirsiniz.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM sinyali alındığında, SignalsHandler Yönelticiyi kapatır.
	// Aynı zamanda `r.Close()` çağrısı ile de yöneltici kapatılabilir.
	router.AddPlugin(plugin.SignalsHandler)

	// Yönlendirici düzeyindeki ara yazılımlar, yönelticiye gönderilen her mesaj için yürütülecektir
	router.AddMiddleware(
		// CorrelationID, gelen mesajın meta verilerinden eşlenik kimliği oluşturulan mesaja kopyalar
		middleware.CorrelationID,

		// İşleyici hata döndürürse, işleyici işlevini yeniden dene.
		// MaxRetries'e ulaşıldıktan sonra, mesaj Nacked olur ve PubSub tarafından yeniden gönderilmesi sorumluluğunu taşır.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer, işleyicideki panikleri ele alır.
		// Bu durumda, bunları hata olarak Retry ara yazılımına iletilir.
		middleware.Recoverer,
	)

	// Basitlik için, burada gochannel Yay/Abone kullanıyoruz,
	// başka bir Yay/Abone uygulaması ile değiştirebilir ve etki aynı olacaktır.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Arka planda bazı gelen mesajlar oluştur
	go publishMessages(pubSub)

	// AddHandler, işleyici düzeyindeki ara yazılımları eklemek veya işleyiciyi durdurmak için kullanılabilen bir işleyici döndürür.
	handler := router.AddHandler(
		"struct_handler",          // İşleyici adı, benzersiz olmalıdır
		"incoming_messages_topic", // Olayların okunduğu konu
		pubSub,
		"outgoing_messages_topic", // Olayların yayınlandığı konu
		pubSub,
		structHandler{}.Handler,
	)

	// İşleyici düzeyindeki ara yazılım, yalnızca belirli bir işleyici için yürütülür
	// Bu tür ara yazılımlar, yöneltici düzeyindeki ara yazılımların eklenme şekliyle aynı şekilde eklenebilir
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("İşleyiciye özgü ara yazılımı", message.UUID, "için yürütülüyor")

			return h(message)
		}
	})

	// Hata ayıklama amaçlı olarak "incoming_messages_topic" üzerinde alınan tüm mesajları yazdırıyoruz
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Hata ayıklama amaçlı olarak "outgoing_messages_topic"e gönderilen tüm olayları yazdırıyoruz
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Tüm işleyicilerin kaydedilmesi tamamlandıktan sonra, yönelticiyi çalıştırıyoruz.
	// Çalışma, yöneltici çalışırken bloke eder.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Gelen Mesajlar

struct_handler, incoming_messages_topic üzerinden mesajları tüketir, bu nedenle arka planda publishMessages()’ı çağırarak gelen trafikleri simüle ediyoruz. Ek olarak, SetCorrelationID ara yazılımını eklediğimize dikkat edin. Yönlendirici, oluşturulan tüm mesajlara (metadata içinde depolanan) bir korelasyon kimliği ekleyecektir.

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("mesaj gönderiliyor %s, korelasyon id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

İşleyiciler

İki tür işleyici fonksiyonu olduğunu fark etmiş olabilirsiniz:

  1. Fonksiyon func(msg *message.Message) ([]*message.Message, error)
  2. Metod func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

İşleyiciniz, bağımlılıklara bağlı olmayan bir fonksiyon ise ilk seçeneği kullanmak uygundur. İşleyiciniz, veritabanı bağlantıları, günlük kayıtları vb. gibi bazı bağımlılıklara ihtiyaç duyuyorsa, ikinci seçenek kullanışlıdır.

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Alınan mesaj: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Buraya bazı bağımlılıklar ekleyebiliriz
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler mesaj aldı", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler tarafından üretilen mesaj"))
    return message.Messages{msg}, nil
}

Tamamlandı!

Bu örneği go run main.go komutuyla çalıştırabilirsiniz.