Publisher and Subscriber adalah bagian-bagian tingkat rendah dari Watermill. Dalam aplikasi praktis, biasanya Anda ingin menggunakan antarmuka dan fungsi tingkat tinggi, seperti asosiasi, metrik, antrian pesan poison, retries, pembatasan tingkat, dan sebagainya.

Terkadang, Anda mungkin tidak ingin mengirim Ack ketika pemrosesan berhasil. Terkadang, Anda mungkin ingin mengirim pesan setelah pesan lain diproses.

Untuk memenuhi persyaratan ini, ada komponen yang disebut Router.

Pemacu Air Router

Konfigurasi

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout menentukan berapa lama router harus bekerja untuk handler saat menutup.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate memeriksa apakah ada kesalahan dalam konfigurasi router.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Penangan

Pertama, Anda perlu mengimplementasikan fungsi HandlerFunc:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc adalah fungsi yang dipanggil ketika pesan diterima.
//
// Ketika HandlerFunc tidak mengembalikan error, msg.Ack() akan dipanggil secara otomatis.
//
// Ketika HandlerFunc mengembalikan error, msg.Nack() akan dipanggil.
//
// Ketika msg.Ack() dipanggil dalam handler dan HandlerFunc mengembalikan error,
// msg.Nack() tidak akan dikirim karena Ack sudah dikirim.
//
// Ketika menerima multiple pesan (karena msg.Ack() dikirim dalam HandlerFunc atau Subscriber mendukung multiple konsumen),
// HandlerFunc akan dieksekusi secara konkuren.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Selanjutnya, Anda perlu menggunakan Router.AddHandler untuk menambahkan penangan baru:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler menambahkan penangan baru.

// handlerName harus unik. Saat ini, hanya digunakan untuk debugging.

// subscribeTopic adalah topik dari mana penangan akan menerima pesan.

// publishTopic adalah topik di mana pesan yang dikembalikan oleh penangan akan dihasilkan oleh Router.

// Ketika penangan perlu menerbitkan ke multiple topik,

// disarankan hanya menyisipkan Publisher ke penangan atau mengimplementasikan middleware,

// yang dapat menangkap pesan berdasarkan metadata dan menerbitkan ke topik tertentu.

// Jika penangan ditambahkan saat router sudah berjalan, RunHandlers() perlu dipanggil secara eksplisit.

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Menambahkan penangan", watermill.LogFields{

		"nama_penangan": handlerName,

		"topik":         subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		nama:        handlerName,

		logger:      r.logger,

		subscriber:  subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:      publisher,

		publishTopic:   publishTopic,

		publisherName:  publisherName,

		handlerFunc:    handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:            nil,

		routersCloseCh:        r.closingInProgressCh,

		startedCh:             make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped tidak selalu menunggu handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler menambahkan penangan baru.

// Penangan ini tidak dapat mengembalikan pesan.

// Ketika mengembalikan pesan, akan terjadi kesalahan dan sebuah Nack akan dikirim.

//

// handlerName harus unik. Saat ini, hanya digunakan untuk debugging.

// subscribeTopic adalah topik dari mana penangan akan menerima pesan.

// subscriber adalah subscriber yang digunakan untuk mengkonsumsi pesan.

// Jika penangan ditambahkan saat router sudah berjalan, RunHandlers() perlu dipanggil secara eksplisit.

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Referensi penggunaan contoh dalam "Memulai". Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler mengembalikan sebuah handler yang dapat digunakan untuk menambahkan middleware tingkat handler atau menghentikan handler.
	handler := router.AddHandler(
		"struct_handler",          // nama handler, harus unik
		"incoming_messages_topic", // topik dari mana event dibaca
		pubSub,
		"outgoing_messages_topic", // topik untuk mempublikasikan event
		pubSub,
		structHandler{}.Handler,
	)

	// Middleware tingkat handler hanya dieksekusi untuk handler tertentu
	// Middleware ini dapat ditambahkan dengan cara yang sama seperti middleware tingkat router
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Menjalankan middleware khusus handler, UUID pesan: ", message.UUID)

			return h(message)
		}
	})
// ...

Tidak Ada Penangan Publisher

Tidak semua penangan akan menghasilkan pesan baru. Anda dapat menggunakan Router.AddNoPublisherHandler untuk menambahkan jenis penangan ini:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler menambahkan penangan baru.
// Penangan ini tidak dapat mengembalikan pesan.
// Ketika mengembalikan pesan, akan terjadi kesalahan dan Nack akan dikirim.
//
// handlerName harus unik dan hanya digunakan untuk tujuan debug saat ini.
//
// subscribeTopic adalah topik di mana penangan akan menerima pesan.
//
// subscriber digunakan untuk mengonsumsi pesan.
//
// Jika Anda menambahkan penangan ke router yang sudah berjalan, Anda perlu memanggil RunHandlers() secara eksplisit.
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

Pengakuan

Secara default, ketika HanderFunc tidak mengembalikan kesalahan, msg.Ack() akan dipanggil. Jika terdapat kesalahan yang dikembalikan, msg.Nack() akan dipanggil. Jadi, setelah menangani pesan, Anda tidak perlu memanggil msg.Ack() atau msg.Nack (tentu saja, Anda bisa jika Anda mau).

Memproduksi Pesan

Ketika beberapa pesan dikembalikan oleh penangan, harap dicatat bahwa sebagian besar implementasi Publisher tidak mendukung penerbitan pesan secara atom. Jika broker atau penyimpanan tidak tersedia, hanya beberapa pesan yang dapat dihasilkan dan msg.Nack() akan dikirim.

Jika ini menjadi masalah, pertimbangkan untuk membuat setiap penangan hanya menerbitkan satu pesan.

Menjalankan Router

Untuk menjalankan router, Anda perlu memanggil Run().

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run menjalankan semua plugin dan penangan, dan mulai berlangganan ke topik yang diberikan.
// Panggilan ini akan diblokir saat router berjalan.
//
// Ketika semua penangan berhenti (misalnya karena langganan telah ditutup), router juga akan berhenti.
//
// Untuk menghentikan Run(), Anda harus memanggil Close() pada router.
//
// ctx akan dipropagasi ke semua pelanggan.
//
// Ketika semua penangan berhenti (misalnya karena koneksi tertutup), Run() juga akan berhenti.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

Memastikan Router Berjalan

Pemahaman apakah router sedang berjalan mungkin berguna. Anda dapat mencapainya menggunakan metode Running().

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running menutup saat router sedang berjalan.
// Dengan kata lain, Anda dapat menunggu hingga router berjalan seperti ini:

// 	fmt.Println("Memulai router")
//	go r.Run(ctx)
//	//	fmt.Println("Router sedang berjalan")

// Peringatan: Untuk alasan historis, saluran ini tidak mengetahui tentang penutupan router - saluran ini akan ditutup jika router terus berjalan dan kemudian berhenti.
func (r *Router) Running() chan struct{} {
// ...
}

Anda juga dapat menggunakan fungsi IsRunning yang mengembalikan nilai boolean:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning mengembalikan true saat router sedang berjalan.
//
// Peringatan: Untuk alasan historis, metode ini tidak mengetahui tentang status penutupan router.
// Jika Anda ingin mengetahui apakah router telah ditutup, gunakan IsClosed.
func (r *Router) IsRunning() bool {
// ...
}

Mematikan router

Untuk mematikan router, Anda perlu memanggil Close().

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close menutup router dengan penyelesaian yang disediakan dalam konfigurasi.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() akan mematikan semua publisher dan subscriber, serta menunggu semua handler selesai.

Close() akan menunggu waktu habis yang diatur dalam RouterConfig.CloseTimeout dalam konfigurasi. Jika waktu habis, Close() akan mengembalikan error.

Menambahkan handler setelah menjalankan router

Anda dapat menambahkan handler baru ketika router sudah berjalan. Untuk melakukannya, Anda perlu memanggil AddNoPublisherHandler atau AddHandler, dan kemudian memanggil RunHandlers.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers menjalankan semua handler yang ditambahkan setelah Run().
// RunHandlers idempoten, sehingga bisa dipanggil berkali-kali dengan aman.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Menghentikan handler yang sedang berjalan

Anda bisa menghentikan hanya satu handler yang sedang berjalan dengan memanggil Stop().

Harap diingat bahwa router akan dimatikan ketika tidak ada handler yang sedang berjalan.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop menghentikan handler.
// Stop bersifat asinkron.
// Anda dapat memeriksa apakah handler sudah dihentikan dengan fungsi Stopped().
func (h *Handler) Stop() {
// ...

Model eksekusi

Subscribers dapat mengonsumsi satu pesan secara berurutan atau beberapa pesan secara paralel.

  • Alur Pesan tunggal adalah metode yang paling sederhana, yang berarti subscriber tidak akan menerima pesan baru hingga msg.Ack() dipanggil.
  • Alur Pesan ganda didukung oleh sebagian subscriber tertentu. Dengan melakukan subscripsi ke beberapa partisi topik secara simultan, beberapa pesan dapat dikonsumsi secara paralel, bahkan pesan yang sebelumnya belum di-acknowledge (misalnya, bagaimana subscriber Kafka bekerja). Router memproses model ini dengan menjalankan HandlerFunc secara paralel.

Harap merujuk ke dokumentasi Pub/Sub yang dipilih untuk memahami model eksekusi yang didukung.

Middleware

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware memungkinkan kita untuk menulis sesuatu yang mirip dengan decorator untuk HandlerFunc.
// Ini dapat menjalankan beberapa operasi sebelum (misalnya, mengubah pesan yang dikonsumsi) atau setelah handler (mengubah pesan yang dihasilkan, ack/nack pesan yang dikonsumsi, menangani kesalahan, logging, dll.).
//
// Itu bisa dilampirkan ke router menggunakan metode `AddMiddleware`.
//
// Contoh:
//
// 	func ContohMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("dieksekusi sebelum handler")
// 			pesanDihasilkan, err := h(message)
// 			fmt.Println("dieksekusi setelah handler")
//
// 			return pesanDihasilkan, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

Daftar lengkap middleware standar dapat ditemukan di Middlewares.

Plugins

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin adalah fungsi yang dieksekusi ketika router mulai.
type RouterPlugin func(*Router) error

// ...

Daftar lengkap plugin standar dapat ditemukan di message/router/plugin.

Konteks

Beberapa nilai yang berguna disimpan dalam konteks untuk setiap pesan yang diterima oleh penangan:

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx mengembalikan nama penangan pesan di router yang mengonsumsi pesan dari konteks.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx mengembalikan nama jenis penerbit pesan di router dari konteks.
// Sebagai contoh, untuk Kafka, itu akan menjadi `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx mengembalikan nama jenis pelanggan pesan di router dari konteks.
// Sebagai contoh, untuk Kafka, itu akan menjadi `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx mengembalikan topik dari mana pesan diterima di router dari konteks.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx mengembalikan topik ke mana pesan akan dipublikasikan di router dari konteks.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...