Instalasi
go get -u github.com/ThreeDotsLabs/watermill
Persiapan
Idea dasar di balik aplikasi berbasis event selalu sama: mendengarkan pesan masuk dan bereaksi terhadapnya. Watermill mendukung implementasi perilaku ini untuk beberapa penerbit dan pelanggan.
Bagian inti dari Watermill adalah Message, yang sama pentingnya dengan http.Request
dalam paket http
. Sebagian besar fitur Watermill menggunakan struct ini dengan beberapa cara.
Meskipun fitur kompleks yang disediakan oleh library PubSub, untuk Watermill, hanya perlu mengimplementasikan dua antarmuka untuk mulai menggunakannya: Publisher
dan Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Berlangganan Pesan
Mari mulai dengan berlangganan. Subscribe
mengharapkan nama topik dan mengembalikan saluran untuk menerima pesan masuk. Arti khusus dari topik tergantung pada implementasi PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("menerima pesan: %s, muatan: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Untuk contoh detail dari PubSub yang didukung, silakan lihat konten berikut.
Contoh Kanal Go
Kode contoh lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Kode contoh lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("menerima pesan: %s, muatan: %s\n", msg.UUID, string(msg.Payload))
// Kita perlu mengakui bahwa kita telah menerima dan memproses pesan,
// jika tidak, pesan akan dikirim ulang beberapa kali.
msg.Ack()
}
}
Contoh Kafka
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Setara dengan auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))
// Kita perlu mengakui bahwa kita telah menerima dan memproses pesan tersebut,
// jika tidak, pesan akan terus diirim ulang.
msg.Ack()
}
}
Contoh RabbitMQ (AMQP)
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Konfigurasi ini didasarkan pada contoh berikut: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Ini digunakan sebagai antrian sederhana.
//
// Jika Anda ingin mengimplementasikan layanan gaya Pub/Sub, harap merujuk ke
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))
// Kita perlu mengakui bahwa pesan tersebut telah diterima dan diproses,
// jika tidak, pesan akan diirim ulang secara berulang.
msg.Ack()
}
}
Contoh SQL
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Menerima pesan: %s, muatan: %s", msg.UUID, string(msg.Payload))
// Kita perlu mengakui bahwa pesan telah diterima dan diproses,
// jika tidak, pesan akan dikirim ulang secara berulang.
msg.Ack()
}
}
Buat Pesan
Watermill tidak mewajibkan format pesan tertentu. NewMessage
mengharapkan muatan menjadi array byte. Anda bisa menggunakan string, JSON, protobuf, Avro, gob, atau format lain yang dapat di-serialize menjadi []byte
.
UUID pesan adalah opsional, namun disarankan untuk membantu dalam debugging.
msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))
Terbitkan Pesan
Metode Publish
memerlukan topik dan satu atau lebih pesan untuk diterbitkan.
err := publisher.Publish("contoh.topik", msg)
if err != nil {
panic(err)
}
Contoh Saluran Go
Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))
if err := publisher.Publish("contoh.topik", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Contoh Kafka
Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))
if err := publisher.Publish("contoh.topik", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Contoh RabbitMQ (AMQP)
Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Contoh SQL
Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Menggunakan Message Router
Publisher dan Subscriber adalah bagian tingkat rendah dari Watermill. Pada kebanyakan kasus, Anda biasanya ingin menggunakan antarmuka dan fitur tingkat tinggi seperti korelasi, metrik, antrian racun, percobaan ulang, dan pembatasan tingkat.
Anda mungkin hanya ingin mengakui pesan setelah berhasil diproses. Dalam kasus lain, Anda mungkin ingin segera mengakui pesan tersebut dan kemudian mempertimbangkan untuk memprosesnya. Terkadang, Anda mungkin ingin melakukan tindakan tertentu berdasarkan pesan masuk dan menerbitkan pesan lain sebagai respons.
Untuk memenuhi persyaratan tersebut, ada komponen yang disebut Router.
Contoh Aplikasi dari Message Router
Alur aplikasi contoh adalah sebagai berikut:
- Menghasilkan pesan pada topik
incoming_messages_topic
setiap detik. - Pendengar
struct_handler
menanganiincoming_messages_topic
. Setelah menerima pesan, ia mencetak UUID dan menghasilkan pesan baru pada topikoutgoing_messages_topic
. - Pendengar
print_incoming_messages
mendengarkanincoming_messages_topic
dan mencetak UUID, payload, serta metadata pesan. - Pendengar
print_outgoing_messages
mendengarkanoutgoing_messages_topic
dan mencetak UUID, payload, serta metadata pesan. ID korelasi seharusnya sama dengan pesan padaincoming_messages_topic
.
Konfigurasi Router
Pertama, konfigurasikan router dengan menambahkan plugin dan middleware. Kemudian, atur handler yang akan digunakan oleh router. Setiap handler akan memproses pesan secara independen.
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Untuk contoh ini, kita menggunakan implementasi logger sederhana,
// Anda mungkin ingin menyediakan implementasi `watermill.LoggerAdapter` sendiri.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Ketika menerima sinyal SIGTERM, SignalsHandler akan menutup Router dengan mulus.
// Anda juga dapat menutup router dengan memanggil `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Middleware tingkat router akan dieksekusi untuk setiap pesan yang dikirim ke router
router.AddMiddleware(
// CorrelationID menyalin ID korelasi dari metadata pesan masuk ke pesan yang dihasilkan
middleware.CorrelationID,
// Jika handler mengembalikan error, coba lagi fungsi handler.
// Setelah mencapai MaxRetries, pesan tersebut akan Ditolak, dan PubSub bertanggung jawab untuk mengirim ulang.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer menangani panic dalam handler.
// Dalam kasus ini, itu memberikannya sebagai error ke middleware Retry.
middleware.Recoverer,
)
// Untuk kesederhanaan, kami menggunakan gochannel Pub/Sub di sini,
// Anda dapat menggantinya dengan implementasi Pub/Sub apa pun dan efeknya akan sama.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Hasilkan beberapa pesan masuk secara asinkron
go publishMessages(pubSub)
// AddHandler mengembalikan handler yang dapat digunakan untuk menambahkan middleware tingkat handler
// atau untuk menghentikan handler.
handler := router.AddHandler(
"struct_handler", // Nama Handler, harus unik
"incoming_messages_topic", // Topik dari mana peristiwa dibaca
pubSub,
"outgoing_messages_topic", // Topik ke mana peristiwa diterbitkan
pubSub,
structHandler{}.Handler,
)
// Middleware tingkat handler hanya dieksekusi untuk handler tertentu
// Jenis middleware ini dapat ditambahkan dengan cara yang sama seperti middleware tingkat router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Menjalankan middleware khusus handler untuk", message.UUID)
return h(message)
}
})
// Untuk tujuan debugging, kita mencetak semua pesan yang diterima di "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Untuk tujuan debugging, kita mencetak semua peristiwa yang dikirim ke "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Sekarang semua handler terdaftar, kita menjalankan router.
// Run akan memblokir saat router berjalan.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Pesan Masuk
struct_handler
mengonsumsi pesan dari incoming_messages_topic
, jadi kita mensimulasikan lalu lintas masuk dengan memanggil publishMessages()
di latar belakang. Perhatikan bahwa kami menambahkan middleware SetCorrelationID
. Router akan menambahkan ID korelasi ke semua pesan yang dihasilkan (disimpan dalam metadata).
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Halo, dunia!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("mengirim pesan %s, ID korelasi: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Penangan
Anda mungkin telah memperhatikan bahwa ada dua jenis fungsi penangan:
- Fungsi
func(msg *message.Message) ([]*message.Message, error)
- Metode
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Jika penangan Anda adalah sebuah fungsi yang tidak tergantung pada dependensi apa pun, maka menggunakan opsi pertama sudah cukup baik. Ketika penangan Anda memerlukan beberapa dependensi (seperti koneksi basis data, logger, dll.), maka opsi kedua akan berguna.
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Menerima pesan: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Di sini kita dapat menambahkan beberapa dependensi
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler menerima pesan", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("pesan diproduksi oleh structHandler"))
return message.Messages{msg}, nil
}
Selesai!
Anda dapat menjalankan contoh ini dengan go run main.go
.