Publisher และ Subscriber เป็นส่วนต่ำสุดของ Watermill ในการใช้ในงานประยุกต์จริง ๆ คุณ通常ต้องการใช้ อินเตอร์เฟซและฟังก์ชันระดับสูง เช่น การเชื่อมต่อ การวัดผล คิวข้อความที่ไม่ถูกต้อง การลองอีกครั้ง การจำกัดอัตรา ฯลฯ

บางครั้ง คุณอาจไม่ต้องการส่ง Ack เมื่อการประมวลผลสำเร็จ บางครั้ง คุณอาจต้องการส่งข้อความหลังจากที่ข้อความอื่นถูกประมวลผล

เพื่อตอบสนองความต้องการเหล่านี้ มีส่วนสำคัญที่เรียกว่า Router.

Watermill Router

การกำหนดค่า

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout กำหนดระยะเวลาเป็นกี่เวลาที่ Router ควรทำงานสำหรับตัวจัดการเมื่อจะปิด
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// ผลการตรวจสอบเห็นว่ามีข้อผิดพลาดในการกำหนดค่า Router
func (c RouterConfig) Validate() error {
	return nil
}
// ...

ตัวจัดการ

ก่อนอื่นคุณต้อง implement ฟังก์ชัน HandlerFunc:

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc คือ ฟังก์ชันที่เรียกเมื่อมีการรับข้อความ

// 
// เมื่อ HandlerFunc ทำงานเสร็จโดยไม่มีการคืนค่าข้อผิดพลาด msg.Ack() จะถูกเรียกโดยอัตโนมัติ
// 
// เมื่อ HandlerFunc ทำงานเสร็จแล้วมีการคืนค่าข้อผิดพลาด msg.Nack() จะถูกเรียก
// 
// เมื่อ msg.Ack() ถูกเรียกใน handler และ HandlerFunc คืนค่าข้อผิดพลาด
// msg.Nack() จะไม่ถูกส่งเนื่องจาก Ack ได้ถูกส่งไปแล้ว
// 
// เมื่อมีการรับข้อความหลายรายการ (เนื่องจาก msg.Ack() ถูกส่งใน HandlerFunc หรือ Subscriber รองรับการบริโภคหลายอัน)
// HandlerFunc จะถูกทำงานโดยประพันธ์แบบขนานระหว่างตนเอง
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

ต่อมาคุณต้องใช้ Router.AddHandler เพื่อเพิ่มตัวจัดการใหม่:

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler เพิ่มตัวจัดการใหม่

// 

// handlerName ต้องเป็นค่าที่ไม่ซ้ำกัน ในปัจจุบันมีสำหรับการสร้างข้อผิดพลาด

// subscribeTopic คือหัวข้อที่ตัวจัดการจะรับข้อความมา

// publishTopic คือหัวข้อข้อความที่ตัวจัดการคืนค่าจะถูกสร้างโดย Router

// เมื่อตัวจัดการต้องการเผยแพร่ไปยังหัวข้อหลายหัวข้อ

// แนะนำให้ฉีด Publisher เข้าไปในตัวจัดการเท่านั้น หรือดำเนินการกลางที่สามารถจับข้อความได้ตาม metadata และเผยแพร่ไปที่หัวข้อที่เฉพาะเจาะจง

// เมื่อเพิ่มตัวจัดการขณะที่ router กำลังทำงาน จำเป็นต้องเรียก RunHandlers() โดยชัดเจน

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("เพิ่มตัวจัดการ", watermill.LogFields{

		"ชื่อตัวจัดการ": handlerName,

		"หัวข้อ":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler เพิ่มตัวจัดการใหม่

// ตัวจัดการนี้ไม่สามารถคืนค่าข้อความได้

// เมื่อมันคืนค่าข้อความเกิดข้อผิดพลาดและส่งคืน Nack

//

// handlerName ต้องเป็นค่าที่ไม่ซ้ำกัน ในปัจจุบันมีสำหรับการสร้างข้อผิดพลาด

// subscribeTopic คือหัวข้อที่ตัวจัดการจะรับข้อความมา

// subscriber เป็น subscriber ที่ใช้ในการบริโภคข้อความ

// หากเพิ่มตัวจัดการขณะที่ router กำลังทำงาน จำเป็นต้องเรียก RunHandlers() โดยชัดเจน

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

อ้างอิงตัวอย่างการใช้ใน "การเริ่มต้น".
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler คืนค่า handler ที่สามารถใช้เพื่อเพิ่ม middleware ระดับ handler หรือหยุด handler
	handler := router.AddHandler(
		"struct_handler",          // ชื่อของ handler ซึ่งจำเป็นต้องไม่ซ้ำกัน
		"incoming_messages_topic", // หัวข้อที่อยู่ภายใน events ที่ถูกอ่าน
		pubSub,
		"outgoing_messages_topic", // หัวข้อที่ใช้เผยแพร่ events
		pubSub,
		structHandler{}.Handler,
	)

	// Middleware ระดับ handler จะถูก execute เฉพาะสำหรับ handler ที่ระบุ
	// Middleware นี้สามารถเพิ่มได้ตามวิธีเดียวกับ middleware ระดับ router
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("กำลัง execute middleware ที่เฉพาะ handler, UUID ของ message: ", message.UUID)

			return h(message)
		}
	})
// ...

ไม่มีผู้จัดการ

ไม่ทุกผู้จัดการจะสร้างข้อความใหม่ คุณสามารถใช้ Router.AddNoPublisherHandler เพื่อเพิ่มประเภทของผู้จัดการนี้:

รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler เพิ่มผู้จัดการใหม่
// ผู้จัดการนี้ไม่สามารถส่งข้อความกลับได้
// เมื่อมีการส่งข้อความกลับมา จะเกิดข้อผิดพลาดและจะส่ง NACK ออกไป
//
// handlerName จะต้องไม่ซ้ำกันและในปัจจุบันใช้สำหรับวัตถุเพื่อตรวจสอบข้อผิดพลาด
//
// subscribeTopic คือหัวข้อที่ผู้จัดการจะได้รับข้อความ
//
// subscriber ใช้ในการบริโภคข้อความ
//
// หากคุณเพิ่มผู้จัดการในเราเตอร์ที่กำลังทำงานอยู่แล้ว คุณต้องเรียก RunHandlers() โดยชัดเจน
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

การยอมรับ

ตามค่าเริ่มต้น เมื่อ HanderFunc ไม่มีการส่งคืนข้อผิดพลาด จะมีการเรียก msg.Ack() ถ้ามีข้อผิดพลาด จะมีการเรียก msg.Nack() ดังนั้นหลังจากการจัดการข้อความ คุณไม่จำเป็นต้องเรียก msg.Ack() หรือ msg.Nack (แน่นอนคุณสามารถทำตามต้องการ)

การสร้างข้อความ

เมื่อมีการส่งคืนข้อความหลายข้อ โปรดทราบว่าการปฏิบัติของส่งข้อความแบบชุดนั้น มักไม่รองรับการส่งข้อความแบบอะตอมิก หากตู้เก็บหรือการจัดเก็บไม่พร้อมใช้งานมีได้เพียงบางข้อความที่สร้างขึ้น และ msg.Nack() จะถูกส่งออกไป

หากนี่เป็นปัญหา โปรดพิจารณาให้ผู้จัดการแต่ละคนส่งเพียงข้อความเดียว

การเรียกเราเตอร์

ในการเริ่มเราเตอร์ คุณต้องเรียก Run()

รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run รันเพลักอินและผู้จัดการทั้งหมด และเริ่มการสับสะพานไปยังหัวข้อที่กำหนด
// เรียกยังไงแล้วไม่ได้ ก็จะสิ้นสุดการทำงานของเราเตอร์
//
// เมื่อผู้จัดการทุกคนหยุด (ตัวอย่างเช่นเนื่องจากการปิดการสับขา) เราเตอร์ก็จะหยุดด้วย
//
// เพื่อหยุดการทำงานของ Run() คุณควรเรียก Close() บนเราเตอร์
//
// ctx จะถูกส่งไปยังผู้บริโภคทั้งหมด
//
// เมื่อผู้จัดการทุกคนหยุด (ตัวอย่างเช่นเนื่องจากการปิดการสับขา) Run() ก็จะหยุดด้วย
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

การแน่ใจว่าเราเตอร์กำลังทำงาน

การเข้าใจว่าเราเตอร์กำลังทำงานอาจเป็นประโยชน์ คุณสามารถทำได้โดยใช้วิธี Running()

รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running ปิดเมื่อเราเตอร์กำลังทำงาน
// กล่าวคือคุณสามารถรอให้เราเตอร์ทำงานได้ดังนี้:

// 	fmt.Println("เริ่มเราเตอร์")
//	go r.Run(ctx)
//	//	fmt.Println("เราเตอร์กำลังทำงาน")

// คำเตือน: เนื่องจากเหตุผลประวัติศาสตร์ ช่องนี้ไม่ทราบถึงการปิดการทำงานของเราเตอร์ - มันจะปิดไปถ้าเราเตอร์ยังทำงานและต่อมาปิดการทำงาน
func (r *Router) Running() chan struct{} {
// ...
}

คุณยังสามารถใช้ฟังก์ชัน IsRunning ซึ่งจะส่งค่าบูลีนอสออกมา:

รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
//IsRunning ส่งคืนค่า true เมื่อเราเตอร์กำลังทำงาน
//
// คำเตือน: เนื่องจากเหตุผลประวัติศาสตร์ วิธีนี้ไม่ทราบถึงสถานะการปิดการทำงานของเราเตอร์
// หากคุณต้องการทราบว่าเราเตอร์ถูกปิดการทำงานหรือยัง โปรดใช้ IsClosed
func (r *Router) IsRunning() bool {
// ...
}

ปิด router

เพื่อที่จะปิด router คุณต้องเรียก Close().

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close ปิด router อย่างสง่างามพร้อมกับการตั้งเวลาหมดอายุที่ถูกกำหนดไว้ใน configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() จะปิดทุกคนที่ตำแหน่งและสมาชิก และรอให้ handler ทุกคนทำงานเสร็จสมบูรณ์.

Close() จะรอให้ถึงเวลาหมดอายุที่ถูกตั้งไว้ใน RouterConfig.CloseTimeout ใน configuration. หากถึงเวลาหมดอายุ Close() จะรีเทิร์นเป็น error.

เพิ่ม handler หลังจากที่ router เริ่มทำงาน

คุณสามารถเพิ่ม handler ใหม่เมื่อ router กำลังทำงาน โดยการเรียก AddNoPublisherHandler หรือ AddHandler แล้วตามด้วย RunHandlers.

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers ทำการรัน handler ทั้งหมดที่ถูกเพิ่มหลังจาก Run() ออกไป
// RunHandlers เป็น idempotent ดังนั้นสามารถเรียกได้หลายครั้งอย่างปลอดภัย
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

หยุดการทำงานของ handler ที่กำลังทำงาน

คุณสามารถหยุด เพียง 1 ตัวของ handler ที่กำลังทำงานอยู่เท่านั้น โดยการเรียก Stop().

โปรดทราบว่า router จะปิดตัวเมื่อไม่มี handler ที่กำลังทำงาน.

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop หยุดการทำงานของ handler
// Stop เป็น asynchronous.
// คุณสามารถตรวจสอบว่า handler ได้ถูกหยุดด้วยฟังก์ชัน Stopped()
func (h *Handler) Stop() {
// ...

รูปแบบการทำงาน

Subscribers สามารถบรรจุข้อความเพียงหนึ่งข้อความต่อจากกันหรือหลายข้อความพร้อมกัน

  • Single message flow เป็นวิธีง่ายที่สุด ซึ่งหมายถึง subscribers จะไม่ได้รับข้อความใหม่จนกว่า msg.Ack() จะถูกเรียก
  • Multiple message flow รองรับโดย subscribers เฉพาะบางอัน โดยการบรรจุข้อความของหลาย topic partitions พร้อมกัน ข้อความหลายตัวสามารถถูกบรรจุพร้อมกัน แม้ข้อความบางอันที่ไม่เคยถูก acknowledge ไว้ (ตัวอย่างเช่น subscribers ของ Kafka) router จะทำการดำเนินการในโดยการทำงาน HandlerFunc พร้อมกับการทำงานของหลายทางออก

โปรดอ่านเอกสารที่เกี่ยวข้องกับ Pub/Sub เพื่อเข้าใจรูปแบบการทำงานที่ถูก support

มิดเดิลแวร์

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware ช่วยให้เราสามารถเขียนบางอย่างคล้ายกับการตกแต่งสำหรับ HandlerFunc
// มันสามารถทำกาการดำเนินการบางอย่างก่อน (เช่น การปรับเปลี่ยนข้อความที่ถูกบรรจุเข้า) หรือหลังจากทำงานของ handler (การปรับเปลี่ยนข้อความที่ถูกสร้างขึ้น การ acknowledge/nack ข้อความที่ถูกบรรจุ, การจัดการข้อผิดพลาด, บันทึก, ฯลฯ)
//
// สามารถเรียกมิดเดิลแวร์เข้า router ของเราโดยใช้วิธี `AddMiddleware`
//
// ตัวอย่าง:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("ทำงานก่อน handler")
// 			ข้อความที่ถูกสร้าง, err := h(message)
// 			fmt.Println("ทำงานหลัง handler")
//
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

รายการแบบมาตรฐานของมิดเดิลแวร์ทั้งหมดสามารถพบได้ใน Middlewares

ปลั๊กอิน

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin เป็นฟังก์ชันทำงานเมื่อ router เริ่มต้น
type RouterPlugin func(*Router) error

// ...

รายการทั้งหมดของปลั๊กอินมาตรฐานสามารถพบได้ใน message/router/plugin.

บทบาท

ค่าบางอย่างที่มีประโยชน์ถูกเก็บไว้ใน context สำหรับแต่ละข้อความที่ได้รับจาก handler:

โค้ดภายใต้เส้นทาง: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx คืนชื่อของ message handler ใน router ที่ได้รับการบริโภคข้อความจาก context
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx คืนชื่อของประเภท publisher ของข้อความใน router จาก context
// ตัวอย่างเช่น สำหรับ Kafka มันจะเป็น `kafka.Publisher`
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx คืนชื่อของประเภท subscriber ของข้อความใน router จาก context
// ตัวอย่างเช่น สำหรับ Kafka มันจะเป็น `kafka.Subscriber`
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx คืนหัวข้อที่ข้อความได้รับการรับส่งใน router จาก context
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx คืนหัวข้อที่ข้อความจะถูกเผยแพร่ใน router จาก context
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...