Publisher และ Subscriber เป็นส่วนต่ำสุดของ Watermill ในการใช้ในงานประยุกต์จริง ๆ คุณ通常ต้องการใช้ อินเตอร์เฟซและฟังก์ชันระดับสูง เช่น การเชื่อมต่อ การวัดผล คิวข้อความที่ไม่ถูกต้อง การลองอีกครั้ง การจำกัดอัตรา ฯลฯ
บางครั้ง คุณอาจไม่ต้องการส่ง Ack เมื่อการประมวลผลสำเร็จ บางครั้ง คุณอาจต้องการส่งข้อความหลังจากที่ข้อความอื่นถูกประมวลผล
เพื่อตอบสนองความต้องการเหล่านี้ มีส่วนสำคัญที่เรียกว่า Router.
การกำหนดค่า
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout กำหนดระยะเวลาเป็นกี่เวลาที่ Router ควรทำงานสำหรับตัวจัดการเมื่อจะปิด
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// ผลการตรวจสอบเห็นว่ามีข้อผิดพลาดในการกำหนดค่า Router
func (c RouterConfig) Validate() error {
return nil
}
// ...
ตัวจัดการ
ก่อนอื่นคุณต้อง implement ฟังก์ชัน HandlerFunc
:
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc คือ ฟังก์ชันที่เรียกเมื่อมีการรับข้อความ
//
// เมื่อ HandlerFunc ทำงานเสร็จโดยไม่มีการคืนค่าข้อผิดพลาด msg.Ack() จะถูกเรียกโดยอัตโนมัติ
//
// เมื่อ HandlerFunc ทำงานเสร็จแล้วมีการคืนค่าข้อผิดพลาด msg.Nack() จะถูกเรียก
//
// เมื่อ msg.Ack() ถูกเรียกใน handler และ HandlerFunc คืนค่าข้อผิดพลาด
// msg.Nack() จะไม่ถูกส่งเนื่องจาก Ack ได้ถูกส่งไปแล้ว
//
// เมื่อมีการรับข้อความหลายรายการ (เนื่องจาก msg.Ack() ถูกส่งใน HandlerFunc หรือ Subscriber รองรับการบริโภคหลายอัน)
// HandlerFunc จะถูกทำงานโดยประพันธ์แบบขนานระหว่างตนเอง
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
ต่อมาคุณต้องใช้ Router.AddHandler
เพื่อเพิ่มตัวจัดการใหม่:
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler เพิ่มตัวจัดการใหม่
//
// handlerName ต้องเป็นค่าที่ไม่ซ้ำกัน ในปัจจุบันมีสำหรับการสร้างข้อผิดพลาด
// subscribeTopic คือหัวข้อที่ตัวจัดการจะรับข้อความมา
// publishTopic คือหัวข้อข้อความที่ตัวจัดการคืนค่าจะถูกสร้างโดย Router
// เมื่อตัวจัดการต้องการเผยแพร่ไปยังหัวข้อหลายหัวข้อ
// แนะนำให้ฉีด Publisher เข้าไปในตัวจัดการเท่านั้น หรือดำเนินการกลางที่สามารถจับข้อความได้ตาม metadata และเผยแพร่ไปที่หัวข้อที่เฉพาะเจาะจง
// เมื่อเพิ่มตัวจัดการขณะที่ router กำลังทำงาน จำเป็นต้องเรียก RunHandlers() โดยชัดเจน
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("เพิ่มตัวจัดการ", watermill.LogFields{
"ชื่อตัวจัดการ": handlerName,
"หัวข้อ": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler เพิ่มตัวจัดการใหม่
// ตัวจัดการนี้ไม่สามารถคืนค่าข้อความได้
// เมื่อมันคืนค่าข้อความเกิดข้อผิดพลาดและส่งคืน Nack
//
// handlerName ต้องเป็นค่าที่ไม่ซ้ำกัน ในปัจจุบันมีสำหรับการสร้างข้อผิดพลาด
// subscribeTopic คือหัวข้อที่ตัวจัดการจะรับข้อความมา
// subscriber เป็น subscriber ที่ใช้ในการบริโภคข้อความ
// หากเพิ่มตัวจัดการขณะที่ router กำลังทำงาน จำเป็นต้องเรียก RunHandlers() โดยชัดเจน
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
อ้างอิงตัวอย่างการใช้ใน "การเริ่มต้น".
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler คืนค่า handler ที่สามารถใช้เพื่อเพิ่ม middleware ระดับ handler หรือหยุด handler
handler := router.AddHandler(
"struct_handler", // ชื่อของ handler ซึ่งจำเป็นต้องไม่ซ้ำกัน
"incoming_messages_topic", // หัวข้อที่อยู่ภายใน events ที่ถูกอ่าน
pubSub,
"outgoing_messages_topic", // หัวข้อที่ใช้เผยแพร่ events
pubSub,
structHandler{}.Handler,
)
// Middleware ระดับ handler จะถูก execute เฉพาะสำหรับ handler ที่ระบุ
// Middleware นี้สามารถเพิ่มได้ตามวิธีเดียวกับ middleware ระดับ router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("กำลัง execute middleware ที่เฉพาะ handler, UUID ของ message: ", message.UUID)
return h(message)
}
})
// ...
ไม่มีผู้จัดการ
ไม่ทุกผู้จัดการจะสร้างข้อความใหม่ คุณสามารถใช้ Router.AddNoPublisherHandler
เพื่อเพิ่มประเภทของผู้จัดการนี้:
รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler เพิ่มผู้จัดการใหม่
// ผู้จัดการนี้ไม่สามารถส่งข้อความกลับได้
// เมื่อมีการส่งข้อความกลับมา จะเกิดข้อผิดพลาดและจะส่ง NACK ออกไป
//
// handlerName จะต้องไม่ซ้ำกันและในปัจจุบันใช้สำหรับวัตถุเพื่อตรวจสอบข้อผิดพลาด
//
// subscribeTopic คือหัวข้อที่ผู้จัดการจะได้รับข้อความ
//
// subscriber ใช้ในการบริโภคข้อความ
//
// หากคุณเพิ่มผู้จัดการในเราเตอร์ที่กำลังทำงานอยู่แล้ว คุณต้องเรียก RunHandlers() โดยชัดเจน
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
การยอมรับ
ตามค่าเริ่มต้น เมื่อ HanderFunc
ไม่มีการส่งคืนข้อผิดพลาด จะมีการเรียก msg.Ack()
ถ้ามีข้อผิดพลาด จะมีการเรียก msg.Nack()
ดังนั้นหลังจากการจัดการข้อความ คุณไม่จำเป็นต้องเรียก msg.Ack()
หรือ msg.Nack
(แน่นอนคุณสามารถทำตามต้องการ)
การสร้างข้อความ
เมื่อมีการส่งคืนข้อความหลายข้อ โปรดทราบว่าการปฏิบัติของส่งข้อความแบบชุดนั้น มักไม่รองรับการส่งข้อความแบบอะตอมิก หากตู้เก็บหรือการจัดเก็บไม่พร้อมใช้งานมีได้เพียงบางข้อความที่สร้างขึ้น และ msg.Nack()
จะถูกส่งออกไป
หากนี่เป็นปัญหา โปรดพิจารณาให้ผู้จัดการแต่ละคนส่งเพียงข้อความเดียว
การเรียกเราเตอร์
ในการเริ่มเราเตอร์ คุณต้องเรียก Run()
รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run รันเพลักอินและผู้จัดการทั้งหมด และเริ่มการสับสะพานไปยังหัวข้อที่กำหนด
// เรียกยังไงแล้วไม่ได้ ก็จะสิ้นสุดการทำงานของเราเตอร์
//
// เมื่อผู้จัดการทุกคนหยุด (ตัวอย่างเช่นเนื่องจากการปิดการสับขา) เราเตอร์ก็จะหยุดด้วย
//
// เพื่อหยุดการทำงานของ Run() คุณควรเรียก Close() บนเราเตอร์
//
// ctx จะถูกส่งไปยังผู้บริโภคทั้งหมด
//
// เมื่อผู้จัดการทุกคนหยุด (ตัวอย่างเช่นเนื่องจากการปิดการสับขา) Run() ก็จะหยุดด้วย
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
การแน่ใจว่าเราเตอร์กำลังทำงาน
การเข้าใจว่าเราเตอร์กำลังทำงานอาจเป็นประโยชน์ คุณสามารถทำได้โดยใช้วิธี Running()
รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running ปิดเมื่อเราเตอร์กำลังทำงาน
// กล่าวคือคุณสามารถรอให้เราเตอร์ทำงานได้ดังนี้:
// fmt.Println("เริ่มเราเตอร์")
// go r.Run(ctx)
// // fmt.Println("เราเตอร์กำลังทำงาน")
// คำเตือน: เนื่องจากเหตุผลประวัติศาสตร์ ช่องนี้ไม่ทราบถึงการปิดการทำงานของเราเตอร์ - มันจะปิดไปถ้าเราเตอร์ยังทำงานและต่อมาปิดการทำงาน
func (r *Router) Running() chan struct{} {
// ...
}
คุณยังสามารถใช้ฟังก์ชัน IsRunning
ซึ่งจะส่งค่าบูลีนอสออกมา:
รหัสฉบับเต็ม: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
//IsRunning ส่งคืนค่า true เมื่อเราเตอร์กำลังทำงาน
//
// คำเตือน: เนื่องจากเหตุผลประวัติศาสตร์ วิธีนี้ไม่ทราบถึงสถานะการปิดการทำงานของเราเตอร์
// หากคุณต้องการทราบว่าเราเตอร์ถูกปิดการทำงานหรือยัง โปรดใช้ IsClosed
func (r *Router) IsRunning() bool {
// ...
}
ปิด router
เพื่อที่จะปิด router คุณต้องเรียก Close()
.
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close ปิด router อย่างสง่างามพร้อมกับการตั้งเวลาหมดอายุที่ถูกกำหนดไว้ใน configuration.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
จะปิดทุกคนที่ตำแหน่งและสมาชิก และรอให้ handler ทุกคนทำงานเสร็จสมบูรณ์.
Close()
จะรอให้ถึงเวลาหมดอายุที่ถูกตั้งไว้ใน RouterConfig.CloseTimeout
ใน configuration. หากถึงเวลาหมดอายุ Close()
จะรีเทิร์นเป็น error.
เพิ่ม handler หลังจากที่ router เริ่มทำงาน
คุณสามารถเพิ่ม handler ใหม่เมื่อ router กำลังทำงาน โดยการเรียก AddNoPublisherHandler
หรือ AddHandler
แล้วตามด้วย RunHandlers
.
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers ทำการรัน handler ทั้งหมดที่ถูกเพิ่มหลังจาก Run() ออกไป
// RunHandlers เป็น idempotent ดังนั้นสามารถเรียกได้หลายครั้งอย่างปลอดภัย
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
หยุดการทำงานของ handler ที่กำลังทำงาน
คุณสามารถหยุด เพียง 1 ตัวของ handler ที่กำลังทำงานอยู่เท่านั้น โดยการเรียก Stop()
.
โปรดทราบว่า router จะปิดตัวเมื่อไม่มี handler ที่กำลังทำงาน.
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop หยุดการทำงานของ handler
// Stop เป็น asynchronous.
// คุณสามารถตรวจสอบว่า handler ได้ถูกหยุดด้วยฟังก์ชัน Stopped()
func (h *Handler) Stop() {
// ...
รูปแบบการทำงาน
Subscribers สามารถบรรจุข้อความเพียงหนึ่งข้อความต่อจากกันหรือหลายข้อความพร้อมกัน
-
Single message flow เป็นวิธีง่ายที่สุด ซึ่งหมายถึง subscribers จะไม่ได้รับข้อความใหม่จนกว่า
msg.Ack()
จะถูกเรียก -
Multiple message flow รองรับโดย subscribers เฉพาะบางอัน โดยการบรรจุข้อความของหลาย topic partitions พร้อมกัน ข้อความหลายตัวสามารถถูกบรรจุพร้อมกัน แม้ข้อความบางอันที่ไม่เคยถูก acknowledge ไว้ (ตัวอย่างเช่น subscribers ของ Kafka) router จะทำการดำเนินการในโดยการทำงาน
HandlerFunc
พร้อมกับการทำงานของหลายทางออก
โปรดอ่านเอกสารที่เกี่ยวข้องกับ Pub/Sub เพื่อเข้าใจรูปแบบการทำงานที่ถูก support
มิดเดิลแวร์
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware ช่วยให้เราสามารถเขียนบางอย่างคล้ายกับการตกแต่งสำหรับ HandlerFunc
// มันสามารถทำกาการดำเนินการบางอย่างก่อน (เช่น การปรับเปลี่ยนข้อความที่ถูกบรรจุเข้า) หรือหลังจากทำงานของ handler (การปรับเปลี่ยนข้อความที่ถูกสร้างขึ้น การ acknowledge/nack ข้อความที่ถูกบรรจุ, การจัดการข้อผิดพลาด, บันทึก, ฯลฯ)
//
// สามารถเรียกมิดเดิลแวร์เข้า router ของเราโดยใช้วิธี `AddMiddleware`
//
// ตัวอย่าง:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("ทำงานก่อน handler")
// ข้อความที่ถูกสร้าง, err := h(message)
// fmt.Println("ทำงานหลัง handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
รายการแบบมาตรฐานของมิดเดิลแวร์ทั้งหมดสามารถพบได้ใน Middlewares
ปลั๊กอิน
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin เป็นฟังก์ชันทำงานเมื่อ router เริ่มต้น
type RouterPlugin func(*Router) error
// ...
รายการทั้งหมดของปลั๊กอินมาตรฐานสามารถพบได้ใน message/router/plugin.
บทบาท
ค่าบางอย่างที่มีประโยชน์ถูกเก็บไว้ใน context
สำหรับแต่ละข้อความที่ได้รับจาก handler:
โค้ดภายใต้เส้นทาง: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx คืนชื่อของ message handler ใน router ที่ได้รับการบริโภคข้อความจาก context
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx คืนชื่อของประเภท publisher ของข้อความใน router จาก context
// ตัวอย่างเช่น สำหรับ Kafka มันจะเป็น `kafka.Publisher`
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx คืนชื่อของประเภท subscriber ของข้อความใน router จาก context
// ตัวอย่างเช่น สำหรับ Kafka มันจะเป็น `kafka.Subscriber`
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx คืนหัวข้อที่ข้อความได้รับการรับส่งใน router จาก context
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx คืนหัวข้อที่ข้อความจะถูกเผยแพร่ใน router จาก context
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...