Instalasi

go get -u github.com/ThreeDotsLabs/watermill

Persiapan

Idea dasar di balik aplikasi berbasis event selalu sama: mendengarkan pesan masuk dan bereaksi terhadapnya. Watermill mendukung implementasi perilaku ini untuk beberapa penerbit dan pelanggan.

Bagian inti dari Watermill adalah Message, yang sama pentingnya dengan http.Request dalam paket http. Sebagian besar fitur Watermill menggunakan struct ini dengan beberapa cara.

Meskipun fitur kompleks yang disediakan oleh library PubSub, untuk Watermill, hanya perlu mengimplementasikan dua antarmuka untuk mulai menggunakannya: Publisher dan Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Berlangganan Pesan

Mari mulai dengan berlangganan. Subscribe mengharapkan nama topik dan mengembalikan saluran untuk menerima pesan masuk. Arti khusus dari topik tergantung pada implementasi PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("menerima pesan: %s, muatan: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Untuk contoh detail dari PubSub yang didukung, silakan lihat konten berikut.

Contoh Kanal Go

Kode contoh lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

	messages, err := pubSub.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Kode contoh lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("menerima pesan: %s, muatan: %s\n", msg.UUID, string(msg.Payload))

		// Kita perlu mengakui bahwa kita telah menerima dan memproses pesan,
		// jika tidak, pesan akan dikirim ulang beberapa kali.
		msg.Ack()
	}
}

Contoh Kafka

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Setara dengan auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))

		// Kita perlu mengakui bahwa kita telah menerima dan memproses pesan tersebut,
		// jika tidak, pesan akan terus diirim ulang.
		msg.Ack()
	}
}

Contoh RabbitMQ (AMQP)

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Konfigurasi ini didasarkan pada contoh berikut: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Ini digunakan sebagai antrian sederhana.
		//
		// Jika Anda ingin mengimplementasikan layanan gaya Pub/Sub, harap merujuk ke
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))

		// Kita perlu mengakui bahwa pesan tersebut telah diterima dan diproses,
		// jika tidak, pesan akan diirim ulang secara berulang.
		msg.Ack()
	}
}

Contoh SQL

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))

		// Kita perlu mengakui bahwa pesan telah diterima dan diproses,
		// jika tidak, pesan akan dikirim ulang secara berulang.
		msg.Ack()
	}
}

Buat Pesan

Watermill tidak mewajibkan format pesan tertentu. NewMessage mengharapkan muatan menjadi array byte. Anda bisa menggunakan string, JSON, protobuf, Avro, gob, atau format lain yang dapat di-serialize menjadi []byte.

UUID pesan adalah opsional, namun disarankan untuk membantu dalam debugging.

msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))

Terbitkan Pesan

Metode Publish memerlukan topik dan satu atau lebih pesan untuk diterbitkan.

err := publisher.Publish("contoh.topik", msg)
if err != nil {
    panic(err)
}

Contoh Saluran Go

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))

		if err := publisher.Publish("contoh.topik", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Contoh Kafka

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)
	
	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
	
	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))

		if err := publisher.Publish("contoh.topik", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Contoh RabbitMQ (AMQP)

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Contoh SQL

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Menggunakan Message Router

Publisher dan Subscriber adalah bagian tingkat rendah dari Watermill. Pada kebanyakan kasus, Anda biasanya ingin menggunakan antarmuka dan fitur tingkat tinggi seperti korelasi, metrik, antrian racun, percobaan ulang, dan pembatasan tingkat.

Anda mungkin hanya ingin mengakui pesan setelah berhasil diproses. Dalam kasus lain, Anda mungkin ingin segera mengakui pesan tersebut dan kemudian mempertimbangkan untuk memprosesnya. Terkadang, Anda mungkin ingin melakukan tindakan tertentu berdasarkan pesan masuk dan menerbitkan pesan lain sebagai respons.

Untuk memenuhi persyaratan tersebut, ada komponen yang disebut Router.

Contoh Aplikasi dari Message Router

Alur aplikasi contoh adalah sebagai berikut:

  1. Menghasilkan pesan pada topik incoming_messages_topic setiap detik.
  2. Pendengar struct_handler menangani incoming_messages_topic. Setelah menerima pesan, ia mencetak UUID dan menghasilkan pesan baru pada topik outgoing_messages_topic.
  3. Pendengar print_incoming_messages mendengarkan incoming_messages_topic dan mencetak UUID, payload, serta metadata pesan.
  4. Pendengar print_outgoing_messages mendengarkan outgoing_messages_topic dan mencetak UUID, payload, serta metadata pesan. ID korelasi seharusnya sama dengan pesan pada incoming_messages_topic.

Konfigurasi Router

Pertama, konfigurasikan router dengan menambahkan plugin dan middleware. Kemudian, atur handler yang akan digunakan oleh router. Setiap handler akan memproses pesan secara independen.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Untuk contoh ini, kita menggunakan implementasi logger sederhana,
	// Anda mungkin ingin menyediakan implementasi `watermill.LoggerAdapter` sendiri.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Ketika menerima sinyal SIGTERM, SignalsHandler akan menutup Router dengan mulus.
	// Anda juga dapat menutup router dengan memanggil `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// Middleware tingkat router akan dieksekusi untuk setiap pesan yang dikirim ke router
	router.AddMiddleware(
		// CorrelationID menyalin ID korelasi dari metadata pesan masuk ke pesan yang dihasilkan
		middleware.CorrelationID,

		// Jika handler mengembalikan error, coba lagi fungsi handler.
		// Setelah mencapai MaxRetries, pesan tersebut akan Ditolak, dan PubSub bertanggung jawab untuk mengirim ulang.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer menangani panic dalam handler.
		// Dalam kasus ini, itu memberikannya sebagai error ke middleware Retry.
		middleware.Recoverer,
	)

	// Untuk kesederhanaan, kami menggunakan gochannel Pub/Sub di sini,
	// Anda dapat menggantinya dengan implementasi Pub/Sub apa pun dan efeknya akan sama.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Hasilkan beberapa pesan masuk secara asinkron
	go publishMessages(pubSub)

	// AddHandler mengembalikan handler yang dapat digunakan untuk menambahkan middleware tingkat handler
	// atau untuk menghentikan handler.
	handler := router.AddHandler(
		"struct_handler",          // Nama Handler, harus unik
		"incoming_messages_topic", // Topik dari mana peristiwa dibaca
		pubSub,
		"outgoing_messages_topic", // Topik ke mana peristiwa diterbitkan
		pubSub,
		structHandler{}.Handler,
	)

	// Middleware tingkat handler hanya dieksekusi untuk handler tertentu
	// Jenis middleware ini dapat ditambahkan dengan cara yang sama seperti middleware tingkat router
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Menjalankan middleware khusus handler untuk", message.UUID)

			return h(message)
		}
	})

	// Untuk tujuan debugging, kita mencetak semua pesan yang diterima di "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Untuk tujuan debugging, kita mencetak semua peristiwa yang dikirim ke "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Sekarang semua handler terdaftar, kita menjalankan router.
	// Run akan memblokir saat router berjalan.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Pesan Masuk

struct_handler mengonsumsi pesan dari incoming_messages_topic, jadi kita mensimulasikan lalu lintas masuk dengan memanggil publishMessages() di latar belakang. Perhatikan bahwa kami menambahkan middleware SetCorrelationID. Router akan menambahkan ID korelasi ke semua pesan yang dihasilkan (disimpan dalam metadata).

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("mengirim pesan %s, ID korelasi: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Penangan

Anda mungkin telah memperhatikan bahwa ada dua jenis fungsi penangan:

  1. Fungsi func(msg *message.Message) ([]*message.Message, error)
  2. Metode func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Jika penangan Anda adalah sebuah fungsi yang tidak tergantung pada dependensi apa pun, maka menggunakan opsi pertama sudah cukup baik. Ketika penangan Anda memerlukan beberapa dependensi (seperti koneksi basis data, logger, dll.), maka opsi kedua akan berguna.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Menerima pesan: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Di sini kita dapat menambahkan beberapa dependensi
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler menerima pesan", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("pesan diproduksi oleh structHandler"))
    return message.Messages{msg}, nil
}

Selesai!

Anda dapat menjalankan contoh ini dengan go run main.go.