PublisherSubscriber는 Watermill의 하위 수준 구성 요소입니다. 실제 응용 프로그램에서는 보통 연관성, 메트릭, 포이즌 메시지 대기열, 재시도, 속도 제한 등과 같은 고수준 인터페이스와 기능을 사용하려고 합니다.

가끔은 처리가 성공적일 때 Ack을 보내고 싶지 않을 수도 있습니다. 또한, 다른 메시지가 처리된 후에 메시지를 보내고 싶을 수도 있습니다.

이러한 요구 사항을 충족하기 위해 Router라는 구성 요소가 있습니다.

Watermill Router

구성

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout은 라우터가 핸들러가 닫힐 때 얼마 동안 작동해야 하는지를 결정합니다.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate는 라우터 구성에 오류가 있는지 확인합니다.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

핸들러

먼저 HandlerFunc 함수를 구현해야합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc는 메시지를 수신할 때 호출되는 함수입니다.
//
// HandlerFunc가 오류를 반환하지 않을 때, msg.Ack()가 자동으로 호출됩니다.
//
// HandlerFunc가 오류를 반환할 때, msg.Nack()이 호출됩니다.
//
// 핸들러에서 msg.Ack()가 호출되고 HandlerFunc이 오류를 반환하는 경우, 
// 이미 Ack가 전송되었기 때문에 Nack이 전송되지 않습니다.
//
// 여러 메시지를 수신할 때 (HandlerFunc에 msg.Ack()가 전송되거나 여러 소비자를 지원하는 경우),
// HandlerFunc은 병렬로 실행됩니다.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

다음으로, Router.AddHandler를 사용하여 새 핸들러를 추가해야합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler는 새 핸들러를 추가합니다.

// handlerName은 고유해야 합니다. 현재는 디버깅에만 사용됩니다.

// subscribeTopic은 핸들러가 메시지를 받게 될 주제입니다.

// publishTopic은 핸들러의 반환된 메시지가 Router에 의해 생성될 주제입니다.

// 핸들러가 여러 주제에 발행해야 할 때는 
// 핸들러에게 Publisher를 주입하거나 메타데이터를 기반으로 메시지를 캡처하고 특정 주제에 발행할 수 있는 미들웨어를 구현하는 것이 권장됩니다.

// 라우터가 이미 실행 중인 상태에서 핸들러를 추가해야 하는 경우, RunHandlers()를 명시적으로 호출해야 합니다.

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("핸들러 추가 중", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler는 새 핸들러를 추가합니다.

// 이 핸들러는 메시지를 반환할 수 없습니다.

// 메시지를 반환하면 오류가 발생하고 Nack이 전송됩니다.

//

// handlerName은 고유해야 합니다. 현재는 디버깅에만 사용됩니다.

// subscribeTopic은 핸들러가 메시지를 받게 될 주제입니다.

// subscriber는 메시지를 소비하는 데 사용되는 구독자입니다.

// 라우터가 이미 실행 중인 상태에서 핸들러를 추가해야 하는 경우, RunHandlers()를 명시적으로 호출해야 합니다.

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

"시작하기"에서 예제 사용법을 참조하십시오. 전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler는 핸들러 레벨의 미들웨어를 추가하거나 핸들러를 중지하는 데 사용할 수 있는 핸들러를 반환합니다.
	handler := router.AddHandler(
		"struct_handler",          // 핸들러 이름, 고유해야 함
		"incoming_messages_topic", // 이벤트가 읽히는 토픽
		pubSub,
		"outgoing_messages_topic", // 이벤트를 발행하는 토픽
		pubSub,
		structHandler{}.Handler,
	)

	// 핸들러 레벨의 미들웨어는 특정 핸들러에 대해서만 실행됩니다.
	// 이 미들웨어는 라우터 레벨의 미들웨어와 동일한 방식으로 추가할 수 있습니다.
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("핸들러별 미들웨어를 실행 중, 메시지 UUID: ", message.UUID)

			return h(message)
		}
	})
// ...

발행자 핸들러 없음

모든 핸들러가 새로운 메시지를 생성하지는 않습니다. Router.AddNoPublisherHandler를 사용하여 이 유형의 핸들러를 추가할 수 있습니다:

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler는 새 핸들러를 추가합니다.
// 이 핸들러는 메시지를 반환할 수 없습니다.
// 메시지를 반환하면 오류가 발생하여 Nack이 전송됩니다.
//
// handlerName은 고유해야 하며 현재는 디버깅 목적으로만 사용됩니다.
//
// subscribeTopic은 핸들러가 메시지를 수신할 주제입니다.
//
// subscriber는 메시지를 소비하는 데 사용됩니다.
//
// 이미 실행 중인 라우터에 핸들러를 추가하는 경우에는 명시적으로 RunHandlers()를 호출해야 합니다.
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

확인

기본적으로 HanderFunc가 오류를 반환하지 않을 때 msg.Ack()가 호출됩니다. 오류가 반환되면 msg.Nack()가 호출됩니다. 따라서 메시지를 처리한 후에는 msg.Ack() 또는 msg.Nack을 호출할 필요가 없습니다 (물론 원한다면 호출할 수 있습니다).

메시지 생성

핸들러에서 여러 메시지가 반환되는 경우 대부분의 발행자 구현은 메시지의 원자적 발행을 지원하지 않음에 유의하십시오. 브로커나 저장소를 사용할 수 없는 경우 일부 메시지만 생성될 수 있으며 msg.Nack()이 전송됩니다.

이것이 문제라면 각 핸들러가 하나의 메시지만 발행하도록 고려하십시오.

라우터 실행

라우터를 실행하려면 Run()을 호출해야 합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run은 모든 플러그인 및 핸들러를 실행하고 지정된 주제를 구독을 시작합니다.
// 이 호출은 라우터가 실행 중인 동안 차단됩니다.
//
// 모든 핸들러가 중지되면(예: 구독이 닫혔기 때문), 라우터도 중지됩니다.
//
// Run()을 중지하려면 라우터에서 Close()를 호출해야 합니다.
//
// ctx는 모든 구독자에게 전파됩니다.
//
// 모든 핸들러가 중지되면(ex: 연결이 닫혔기 때문), Run()도 중지됩니다.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

라우터 실행 확인

라우터가 실행 중인지 여부를 파악하는 것은 유용할 수 있습니다. Running() 메서드를 사용하여 이를 달성할 수 있습니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running은 라우터가 실행 중일 때 닫힙니다.
// 다시 말해, 라우터가 실행 중인지 여부를 다음과 같이 확인할 수 있습니다:

// 	fmt.Println("라우터 시작")
//	go r.Run(ctx)
//	//	fmt.Println("라우터 실행 중")

// 경고: 역사적인 이유로 이 채널은 라우터의 종료에 대해 알지 못합니다. 라우터가 계속 실행되고 종료되면 채널이 닫힐 것입니다.
func (r *Router) Running() chan struct{} {
// ...
}

또한 부울 값이 반환되는 IsRunning 함수를 사용할 수 있습니다:

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning은 라우터가 실행 중일 때 참을 반환합니다.
//
// 경고: 역사적인 이유로 이 메서드는 라우터의 닫힌 상태를 알지 못합니다. 라우터가 닫혔는지 여부를 알고 싶다면 IsClosed를 사용하십시오.
func (r *Router) IsRunning() bool {
// ...
}

라우터 종료

라우터를 종료하려면 Close()를 호출해야 합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close는 구성에서 제공된 타임아웃으로 라우터를 정상적으로 종료합니다.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close()를 호출하면 모든 퍼블리셔 및 구독자가 종료되고 모든 핸들러가 완료될 때까지 대기합니다.

Close()는 구성에서 설정한 RouterConfig.CloseTimeout 시간까지 대기합니다. 시간초과가 발생하면 Close()는 오류를 반환합니다.

라우터 시작 후 핸들러 추가

라우터가 이미 실행 중일 때 새로운 핸들러를 추가할 수 있습니다. 이를 위해서는 AddNoPublisherHandler 또는 AddHandler를 호출한 다음 RunHandlers를 호출해야 합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers는 Run() 이후에 추가된 모든 핸들러를 실행합니다.
// RunHandlers는 멱등성을 가지므로 여러 번 안전하게 호출할 수 있습니다.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

실행 중인 핸들러 중지

Stop()을 호출하여 하나의 실행 중인 핸들러만 중지할 수 있습니다.

라우터에 실행 중인 핸들러가 없을 때 라우터가 종료됨에 유의하십시오.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop은 핸들러를 중지합니다.
// Stop은 비동기적으로 동작합니다.
// 중지된 핸들러인지 확인할 수 있습니다.
func (h *Handler) Stop() {
// ...

실행 모델

구독자는 단일 메시지를 순차적으로 소비하거나 병렬로 여러 메시지를 소비할 수 있습니다.

  • 단일 메시지 흐름은 가장 간단한 방법으로, 구독자가 msg.Ack()가 호출될 때까지 새로운 메시지를 받지 않습니 다.
  • 다중 메시지 흐름은 일부 구독자만 지원합니다. 여러 주제 파티션을 동시에 구독하여 병렬로 여러 메시지를 소비할 수 있으며, 이전에 확인되지 않은 메시지도 소비할 수 있습니다 (예: 카프카 구독자 동작 방식). 라우터는 이 모델을 HandlerFunc를 병렬로 실행함으로써 처리합니다.

지원되는 실행 모델을 이해하려면 선택한 Pub/Sub 문서를 참조하십시오.

미들웨어

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware를 사용하여 HandlerFunc와 유사한 것을 작성할 수 있습니다.
// HandlerMiddleware는 handler 이전에 (예: 수신된 메시지 수정) 또는 이후 (생성된 메시지 수정, 수신된 메시지를 확인 또는 거부, 오류 처리, 로그 기록 등)에 일부 작업을 실행할 수 있습니다.
//
// 이를 라우터에 `AddMiddleware` 메소드를 사용하여 연결할 수 있습니다.
//
// 예:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("handler 이전에 실행")
// 			createdMessages, err := h(message)
// 			fmt.Println("handler 이후에 실행")
//
// 			return createdMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

표준 미들웨어의 전체 목록은 Middlewares에서 찾을 수 있습니다.

플러그인

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin은 라우터 시작 시 실행되는 함수입니다.
type RouterPlugin func(*Router) error

// ...

표준 플러그인의 전체 목록은 message/router/plugin에서 찾을 수 있습니다.

문맥

핸들러가 수신하는 각 메시지에 대해 context에 유용한 값이 저장됩니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx는 라우터에서 컨텍스트로 메시지를 소비한 메시지 핸들러의 이름을 반환합니다.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx는 라우터에서 컨텍스트로 메시지를 발행하는 방식의 이름을 반환합니다.
// 예를 들어 Kafka의 경우 `kafka.Publisher`일 것입니다.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx는 라우터에서 컨텍스트로 메시지를 구독하는 방식의 이름을 반환합니다.
// 예를 들어 Kafka의 경우 `kafka.Subscriber`일 것입니다.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx는 라우터에서 컨텍스트로 메시지를 수신한 토픽을 반환합니다.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx는 라우터에서 컨텍스트로 메시지를 발행할 토픽을 반환합니다.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...