Kurulum
go get -u github.com/ThreeDotsLabs/watermill
Hazırlık
Olay temelli uygulamaların temel fikri her zaman aynıdır: gelen mesajları dinlemek ve onlara tepki vermek. Watermill, bu davranışın birden fazla yayıncı ve aboneler için uygulanmasını destekler.
Watermill'in temel parçası Message'dır, bu, http
paketindeki http.Request
kadar önemlidir. Watermill özelliklerinin çoğu, bu yapıyı bir şekilde kullanır.
PubSub kitabının sağladığı karmaşık özelliklere rağmen, Watermill için bunları kullanmaya başlamanın sadece iki arayüzü uygulamak zorunluluğu vardır: Publisher
ve Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Mesajlara Abone Olma
Abone olmaya başlayalım. Subscribe
, bir konu adı bekler ve gelen mesajları almak için bir kanal döndürür. Konu'nun belirli anlamı, PubSub uygulamasına bağlı olarak değişir.
messages, err := subscriber.Subscribe(ctx, "örnek.konu")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("mesaj alındı: %s, yük: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Desteklenen PubSub örnekleri için detaylı örnekler için aşağıdaki içeriğe başvurun.
Go Kanalı Örneği
Tam örnek kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "örnek.konu")
if err != nil {
panic(err)
}
go process(messages)
// ...
Tam örnek kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("mesaj alındı: %s, yük: %s\n", msg.UUID, string(msg.Payload))
// Mesajı aldığımızı ve işlediğimizi onaylamamız gerekir,
// aksi takdirde birden fazla kez yeniden gönderilecektir.
msg.Ack()
}
}
Kafka Örneği
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// auto.offset.reset: 'earliest' ile aynıdır
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("mesaj alındı: %s, içerik: %s", msg.UUID, string(msg.Payload))
// Mesajın alındığını ve işlendiğini doğrulamamız gerekiyor,
// aksi halde mesaj sürekli olarak tekrar gönderilecektir.
msg.Ack()
}
}
RabbitMQ (AMQP) Örneği
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Bu yapılandırma aşağıdaki örneğe dayalıdır: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Basit bir kuyruk olarak kullanılır.
//
// Bir Yayın/Abone tarzı hizmet uygulamak istiyorsanız, lütfen şuraya bakın
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Mesaj alındı: %s, içerik: %s", msg.UUID, string(msg.Payload))
// Mesajın alındığını ve işlendiğini doğrulamamız gerekiyor,
// aksi halde mesaj sürekli olarak yeniden iletilir.
msg.Ack()
}
}
SQL Örneği
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB() // DB oluşturuluyor
logger := watermill.NewStdLogger(false, false) // Logger oluşturuluyor
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{}, // Şema uyumlu MySql varsayılan
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{}, // Offset adaptörü uyumlu MySql varsayılanı
InitializeSchema: true, // Şema başlatılıyor
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// Mesajın alındığını ve işlendiğini onaylamamız gerekiyor,
// aksi halde mesaj tekrar tekrar iletilecektir.
msg.Ack()
}
}
Mesaj Oluşturma
Watermill herhangi bir mesaj formatını zorunlu kılmaz. NewMessage
, özelliğin bir byte dizisi olmasını bekler. Dizeler, JSON, protobuf, Avro, gob veya []byte
'e seri hale getirilebilen başka bir formatta kullanılabilir.
Mesaj UUID'si isteğe bağlıdır, ancak hata ayıklamada yardımcı olması önerilir.
msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
Mesaj Yayınlama
Publish
yöntemi, bir konu ve yayınlamak için bir veya daha fazla mesaj gerektirir.
err := publisher.Publish("örnek.konu", msg)
if err != nil {
panic(err)
}
Go Kanalı Örneği
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
mesajlarıYayınla(pubSub)
}
func mesajlarıYayınla(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
if err := publisher.Publish("örnek.konu", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Kafka Örneği
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
mesajlarıYayınla(publisher)
}
func mesajlarıYayınla(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
if err := publisher.Publish("örnek.konu", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
RabbitMQ (AMQP) Örneği
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
if err := publisher.Publish("örnek.konu", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
SQL Örneği
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Merhaba, dünya!"}`))
if err := publisher.Publish("örnek_konu", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Mesaj Yönlendirici Kullanımı
Yayıncılar ve Aboneler, Watermill'in alt düzey parçalarıdır. Çoğu durumda, genellikle ilişkilendirme, metrikler, zehirli kuyruklar, yeniden deneme ve hız sınırlama gibi daha üst düzey arabirimleri ve özellikleri kullanmak isteyebilirsiniz.
Mesajın başarılı bir şekilde işlendiğinden emin olmadan mesajı yalnızca o zaman kabul etmek isteyebilirsiniz. Diğer durumlarda, hemen kabul etmek ve ardından düşünmek isteyebilirsiniz. Bazen gelen mesaja göre belirli eylemler gerçekleştirmek ve yanıt olarak başka bir mesaj yayınlamak isteyebilirsiniz.
Bu gereksinimleri karşılamak için Yönlendirici adında bir bileşen bulunmaktadır.
Mesaj Yönlendirici Örnek Uygulaması
Örnek uygulamanın akışı şöyledir:
- Her saniye
gelen_mesajlar_konu
üzerinde bir mesaj oluşturun. -
struct_handler
dinleyici,gelen_mesajlar_konu
'nu işler. Bir mesaj alır almaz UUID'yi yazdırır vegiden_mesajlar_konu
üzerinde yeni bir mesaj oluşturur. -
print_gelen_mesajlar
handlergelen_mesajlar_konu
'nu dinler ve mesajın UUID, veri yükü ve meta verilerini yazdırır. -
print_giden_mesajlar
handlergiden_mesajlar_konu
'nu dinler ve mesajın UUID, veri yükü ve meta verilerini yazdırır. İlişkilendirme ID'si,gelen_mesajlar_konu
'ndaki mesajla aynı olmalıdır.
Yönlendirici Konfigürasyonu
Öncelikle, yönelticiyi eklentileri ve ara yazılımları ekleyerek yapılandırın. Ardından, yönelticinin kullanacağı işleyicileri ayarlayın. Her işleyici mesajları bağımsız olarak işleyecektir.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Bu örnekte, basit bir günlükleyici uygulaması kullanıyoruz,
// kendi `watermill.LoggerAdapter` uygulamanızı sağlamak isteyebilirsiniz.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SIGTERM sinyali alındığında, SignalsHandler Yönelticiyi kapatır.
// Aynı zamanda `r.Close()` çağrısı ile de yöneltici kapatılabilir.
router.AddPlugin(plugin.SignalsHandler)
// Yönlendirici düzeyindeki ara yazılımlar, yönelticiye gönderilen her mesaj için yürütülecektir
router.AddMiddleware(
// CorrelationID, gelen mesajın meta verilerinden eşlenik kimliği oluşturulan mesaja kopyalar
middleware.CorrelationID,
// İşleyici hata döndürürse, işleyici işlevini yeniden dene.
// MaxRetries'e ulaşıldıktan sonra, mesaj Nacked olur ve PubSub tarafından yeniden gönderilmesi sorumluluğunu taşır.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer, işleyicideki panikleri ele alır.
// Bu durumda, bunları hata olarak Retry ara yazılımına iletilir.
middleware.Recoverer,
)
// Basitlik için, burada gochannel Yay/Abone kullanıyoruz,
// başka bir Yay/Abone uygulaması ile değiştirebilir ve etki aynı olacaktır.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Arka planda bazı gelen mesajlar oluştur
go publishMessages(pubSub)
// AddHandler, işleyici düzeyindeki ara yazılımları eklemek veya işleyiciyi durdurmak için kullanılabilen bir işleyici döndürür.
handler := router.AddHandler(
"struct_handler", // İşleyici adı, benzersiz olmalıdır
"incoming_messages_topic", // Olayların okunduğu konu
pubSub,
"outgoing_messages_topic", // Olayların yayınlandığı konu
pubSub,
structHandler{}.Handler,
)
// İşleyici düzeyindeki ara yazılım, yalnızca belirli bir işleyici için yürütülür
// Bu tür ara yazılımlar, yöneltici düzeyindeki ara yazılımların eklenme şekliyle aynı şekilde eklenebilir
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("İşleyiciye özgü ara yazılımı", message.UUID, "için yürütülüyor")
return h(message)
}
})
// Hata ayıklama amaçlı olarak "incoming_messages_topic" üzerinde alınan tüm mesajları yazdırıyoruz
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Hata ayıklama amaçlı olarak "outgoing_messages_topic"e gönderilen tüm olayları yazdırıyoruz
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Tüm işleyicilerin kaydedilmesi tamamlandıktan sonra, yönelticiyi çalıştırıyoruz.
// Çalışma, yöneltici çalışırken bloke eder.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Gelen Mesajlar
struct_handler
, incoming_messages_topic
üzerinden mesajları tüketir, bu nedenle arka planda publishMessages()
’ı çağırarak gelen trafikleri simüle ediyoruz. Ek olarak, SetCorrelationID
ara yazılımını eklediğimize dikkat edin. Yönlendirici, oluşturulan tüm mesajlara (metadata içinde depolanan) bir korelasyon kimliği ekleyecektir.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Merhaba, dünya!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("mesaj gönderiliyor %s, korelasyon id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
İşleyiciler
İki tür işleyici fonksiyonu olduğunu fark etmiş olabilirsiniz:
- Fonksiyon
func(msg *message.Message) ([]*message.Message, error)
- Metod
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
İşleyiciniz, bağımlılıklara bağlı olmayan bir fonksiyon ise ilk seçeneği kullanmak uygundur. İşleyiciniz, veritabanı bağlantıları, günlük kayıtları vb. gibi bazı bağımlılıklara ihtiyaç duyuyorsa, ikinci seçenek kullanışlıdır.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Alınan mesaj: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Buraya bazı bağımlılıklar ekleyebiliriz
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler mesaj aldı", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler tarafından üretilen mesaj"))
return message.Messages{msg}, nil
}
Tamamlandı!
Bu örneği go run main.go
komutuyla çalıştırabilirsiniz.