Publisher와 Subscriber는 Watermill의 하위 수준 구성 요소입니다. 실제 응용 프로그램에서는 보통 연관성, 메트릭, 포이즌 메시지 대기열, 재시도, 속도 제한 등과 같은 고수준 인터페이스와 기능을 사용하려고 합니다.
가끔은 처리가 성공적일 때 Ack을 보내고 싶지 않을 수도 있습니다. 또한, 다른 메시지가 처리된 후에 메시지를 보내고 싶을 수도 있습니다.
이러한 요구 사항을 충족하기 위해 Router라는 구성 요소가 있습니다.
구성
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout은 라우터가 핸들러가 닫힐 때 얼마 동안 작동해야 하는지를 결정합니다.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate는 라우터 구성에 오류가 있는지 확인합니다.
func (c RouterConfig) Validate() error {
return nil
}
// ...
핸들러
먼저 HandlerFunc
함수를 구현해야합니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc는 메시지를 수신할 때 호출되는 함수입니다.
//
// HandlerFunc가 오류를 반환하지 않을 때, msg.Ack()가 자동으로 호출됩니다.
//
// HandlerFunc가 오류를 반환할 때, msg.Nack()이 호출됩니다.
//
// 핸들러에서 msg.Ack()가 호출되고 HandlerFunc이 오류를 반환하는 경우,
// 이미 Ack가 전송되었기 때문에 Nack이 전송되지 않습니다.
//
// 여러 메시지를 수신할 때 (HandlerFunc에 msg.Ack()가 전송되거나 여러 소비자를 지원하는 경우),
// HandlerFunc은 병렬로 실행됩니다.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
다음으로, Router.AddHandler
를 사용하여 새 핸들러를 추가해야합니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler는 새 핸들러를 추가합니다.
// handlerName은 고유해야 합니다. 현재는 디버깅에만 사용됩니다.
// subscribeTopic은 핸들러가 메시지를 받게 될 주제입니다.
// publishTopic은 핸들러의 반환된 메시지가 Router에 의해 생성될 주제입니다.
// 핸들러가 여러 주제에 발행해야 할 때는
// 핸들러에게 Publisher를 주입하거나 메타데이터를 기반으로 메시지를 캡처하고 특정 주제에 발행할 수 있는 미들웨어를 구현하는 것이 권장됩니다.
// 라우터가 이미 실행 중인 상태에서 핸들러를 추가해야 하는 경우, RunHandlers()를 명시적으로 호출해야 합니다.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("핸들러 추가 중", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler는 새 핸들러를 추가합니다.
// 이 핸들러는 메시지를 반환할 수 없습니다.
// 메시지를 반환하면 오류가 발생하고 Nack이 전송됩니다.
//
// handlerName은 고유해야 합니다. 현재는 디버깅에만 사용됩니다.
// subscribeTopic은 핸들러가 메시지를 받게 될 주제입니다.
// subscriber는 메시지를 소비하는 데 사용되는 구독자입니다.
// 라우터가 이미 실행 중인 상태에서 핸들러를 추가해야 하는 경우, RunHandlers()를 명시적으로 호출해야 합니다.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
"시작하기"에서 예제 사용법을 참조하십시오. 전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler는 핸들러 레벨의 미들웨어를 추가하거나 핸들러를 중지하는 데 사용할 수 있는 핸들러를 반환합니다.
handler := router.AddHandler(
"struct_handler", // 핸들러 이름, 고유해야 함
"incoming_messages_topic", // 이벤트가 읽히는 토픽
pubSub,
"outgoing_messages_topic", // 이벤트를 발행하는 토픽
pubSub,
structHandler{}.Handler,
)
// 핸들러 레벨의 미들웨어는 특정 핸들러에 대해서만 실행됩니다.
// 이 미들웨어는 라우터 레벨의 미들웨어와 동일한 방식으로 추가할 수 있습니다.
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("핸들러별 미들웨어를 실행 중, 메시지 UUID: ", message.UUID)
return h(message)
}
})
// ...
발행자 핸들러 없음
모든 핸들러가 새로운 메시지를 생성하지는 않습니다. Router.AddNoPublisherHandler
를 사용하여 이 유형의 핸들러를 추가할 수 있습니다:
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler는 새 핸들러를 추가합니다.
// 이 핸들러는 메시지를 반환할 수 없습니다.
// 메시지를 반환하면 오류가 발생하여 Nack이 전송됩니다.
//
// handlerName은 고유해야 하며 현재는 디버깅 목적으로만 사용됩니다.
//
// subscribeTopic은 핸들러가 메시지를 수신할 주제입니다.
//
// subscriber는 메시지를 소비하는 데 사용됩니다.
//
// 이미 실행 중인 라우터에 핸들러를 추가하는 경우에는 명시적으로 RunHandlers()를 호출해야 합니다.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
확인
기본적으로 HanderFunc
가 오류를 반환하지 않을 때 msg.Ack()
가 호출됩니다. 오류가 반환되면 msg.Nack()
가 호출됩니다. 따라서 메시지를 처리한 후에는 msg.Ack()
또는 msg.Nack
을 호출할 필요가 없습니다 (물론 원한다면 호출할 수 있습니다).
메시지 생성
핸들러에서 여러 메시지가 반환되는 경우 대부분의 발행자 구현은 메시지의 원자적 발행을 지원하지 않음에 유의하십시오. 브로커나 저장소를 사용할 수 없는 경우 일부 메시지만 생성될 수 있으며 msg.Nack()
이 전송됩니다.
이것이 문제라면 각 핸들러가 하나의 메시지만 발행하도록 고려하십시오.
라우터 실행
라우터를 실행하려면 Run()
을 호출해야 합니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run은 모든 플러그인 및 핸들러를 실행하고 지정된 주제를 구독을 시작합니다.
// 이 호출은 라우터가 실행 중인 동안 차단됩니다.
//
// 모든 핸들러가 중지되면(예: 구독이 닫혔기 때문), 라우터도 중지됩니다.
//
// Run()을 중지하려면 라우터에서 Close()를 호출해야 합니다.
//
// ctx는 모든 구독자에게 전파됩니다.
//
// 모든 핸들러가 중지되면(ex: 연결이 닫혔기 때문), Run()도 중지됩니다.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
라우터 실행 확인
라우터가 실행 중인지 여부를 파악하는 것은 유용할 수 있습니다. Running()
메서드를 사용하여 이를 달성할 수 있습니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running은 라우터가 실행 중일 때 닫힙니다.
// 다시 말해, 라우터가 실행 중인지 여부를 다음과 같이 확인할 수 있습니다:
// fmt.Println("라우터 시작")
// go r.Run(ctx)
// // fmt.Println("라우터 실행 중")
// 경고: 역사적인 이유로 이 채널은 라우터의 종료에 대해 알지 못합니다. 라우터가 계속 실행되고 종료되면 채널이 닫힐 것입니다.
func (r *Router) Running() chan struct{} {
// ...
}
또한 부울 값이 반환되는 IsRunning
함수를 사용할 수 있습니다:
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning은 라우터가 실행 중일 때 참을 반환합니다.
//
// 경고: 역사적인 이유로 이 메서드는 라우터의 닫힌 상태를 알지 못합니다. 라우터가 닫혔는지 여부를 알고 싶다면 IsClosed를 사용하십시오.
func (r *Router) IsRunning() bool {
// ...
}
라우터 종료
라우터를 종료하려면 Close()
를 호출해야 합니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close는 구성에서 제공된 타임아웃으로 라우터를 정상적으로 종료합니다.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
를 호출하면 모든 퍼블리셔 및 구독자가 종료되고 모든 핸들러가 완료될 때까지 대기합니다.
Close()
는 구성에서 설정한 RouterConfig.CloseTimeout
시간까지 대기합니다. 시간초과가 발생하면 Close()
는 오류를 반환합니다.
라우터 시작 후 핸들러 추가
라우터가 이미 실행 중일 때 새로운 핸들러를 추가할 수 있습니다. 이를 위해서는 AddNoPublisherHandler
또는 AddHandler
를 호출한 다음 RunHandlers
를 호출해야 합니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers는 Run() 이후에 추가된 모든 핸들러를 실행합니다.
// RunHandlers는 멱등성을 가지므로 여러 번 안전하게 호출할 수 있습니다.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
실행 중인 핸들러 중지
Stop()
을 호출하여 하나의 실행 중인 핸들러만 중지할 수 있습니다.
라우터에 실행 중인 핸들러가 없을 때 라우터가 종료됨에 유의하십시오.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop은 핸들러를 중지합니다.
// Stop은 비동기적으로 동작합니다.
// 중지된 핸들러인지 확인할 수 있습니다.
func (h *Handler) Stop() {
// ...
실행 모델
구독자는 단일 메시지를 순차적으로 소비하거나 병렬로 여러 메시지를 소비할 수 있습니다.
-
단일 메시지 흐름은 가장 간단한 방법으로, 구독자가
msg.Ack()
가 호출될 때까지 새로운 메시지를 받지 않습니 다. -
다중 메시지 흐름은 일부 구독자만 지원합니다. 여러 주제 파티션을 동시에 구독하여 병렬로 여러 메시지를 소비할 수 있으며, 이전에 확인되지 않은 메시지도 소비할 수 있습니다 (예: 카프카 구독자 동작 방식). 라우터는 이 모델을
HandlerFunc
를 병렬로 실행함으로써 처리합니다.
지원되는 실행 모델을 이해하려면 선택한 Pub/Sub 문서를 참조하십시오.
미들웨어
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware를 사용하여 HandlerFunc와 유사한 것을 작성할 수 있습니다.
// HandlerMiddleware는 handler 이전에 (예: 수신된 메시지 수정) 또는 이후 (생성된 메시지 수정, 수신된 메시지를 확인 또는 거부, 오류 처리, 로그 기록 등)에 일부 작업을 실행할 수 있습니다.
//
// 이를 라우터에 `AddMiddleware` 메소드를 사용하여 연결할 수 있습니다.
//
// 예:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("handler 이전에 실행")
// createdMessages, err := h(message)
// fmt.Println("handler 이후에 실행")
//
// return createdMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
표준 미들웨어의 전체 목록은 Middlewares에서 찾을 수 있습니다.
플러그인
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin은 라우터 시작 시 실행되는 함수입니다.
type RouterPlugin func(*Router) error
// ...
표준 플러그인의 전체 목록은 message/router/plugin에서 찾을 수 있습니다.
문맥
핸들러가 수신하는 각 메시지에 대해 context
에 유용한 값이 저장됩니다.
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx는 라우터에서 컨텍스트로 메시지를 소비한 메시지 핸들러의 이름을 반환합니다.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx는 라우터에서 컨텍스트로 메시지를 발행하는 방식의 이름을 반환합니다.
// 예를 들어 Kafka의 경우 `kafka.Publisher`일 것입니다.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx는 라우터에서 컨텍스트로 메시지를 구독하는 방식의 이름을 반환합니다.
// 예를 들어 Kafka의 경우 `kafka.Subscriber`일 것입니다.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx는 라우터에서 컨텍스트로 메시지를 수신한 토픽을 반환합니다.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx는 라우터에서 컨텍스트로 메시지를 발행할 토픽을 반환합니다.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...