소개

미들웨어는 이벤트 프레임워크를 확장하고 사용자 정의 기능을 추가하며, 주요 핸들러의 논리와 관련이 없는 중요한 기능을 제공하는 데 사용됩니다. 예를 들어, 핸들러가 오류를 반환한 후에 재시도하거나, 패닉 상태에서 회복하고 핸들러 내에서 스택 트레이스를 캡처하는 등의 기능을 수행할 수 있습니다.

미들웨어 함수 시그니처는 다음과 같이 정의됩니다:

전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware를 사용하면 핸들러와 유사한 데코레이터를 작성할 수 있습니다.
// 핸들러 이전에 일부 작업을 수행할 수 있으며(예: 수신된 메시지 수정), 핸들러 이후에도 일부 작업을 수행할 수 있습니다(생성된 메시지 수정, 수신된 메시지에 대한 ACK/NACK, 오류 처리, 로깅 등).
//
// 이것은 `AddMiddleware` 메서드를 사용하여 라우터에 추가할 수 있습니다.
//
// 예:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("핸들러 실행 전")
//			생성된메시지, 오류 := h(message)
//			fmt.Println("핸들러 실행 후")
//
//			return 생성된메시지, 오류
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

사용법

미들웨어는 라우터의 모든 핸들러에 적용하거나 특정 핸들러에 적용할 수 있습니다. 미들웨어가 라우터에 직접 추가되면 해당 라우터의 모든 핸들러에 적용됩니다. 미들웨어가 특정 핸들러에만 적용되는 경우, 해당 핸들러에 라우터에 추가해야 합니다.

다음은 사용 예입니다:

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM 신호를 수신할 때 SignalsHandler가 라우터를 안전하게 닫습니다.
	// `r.Close()`를 호출하여 라우터를 닫을 수도 있습니다.
	router.AddPlugin(plugin.SignalsHandler)

	// 라우터 수준의 미들웨어는 라우터로 전송된 모든 메시지에 대해 실행됩니다
	router.AddMiddleware(
		// CorrelationID는 수신한 메시지 메타데이터에서 상관관계 ID를 생성된 메시지에 복사합니다
		middleware.CorrelationID,

		// 핸들러가 오류를 반환하면 재시도됩니다.
		// 최대 MaxRetries 번까지 재시도되며, 그 이후에는 메시지가 Nacked되어 PubSub에 의해 다시 전송됩니다.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer는 핸들러에서 발생한 패닉을 처리합니다.
		// 이 경우에는 Retry 미들웨어에 오류로 전달합니다.
		middleware.Recoverer,
	)

	// 간단하게 gochannel Pub/Sub을 사용하며,
	// 다른 Pub/Sub 구현으로 대체할 수 있으며 동일하게 작동합니다.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// 백그라운드에서 일부 수신 메시지를 발행합니다
	go publishMessages(pubSub)

	// AddHandler는 핸들러 수준의 미들웨어를 추가하거나 핸들러를 중지하는 데 사용할 수 있는 핸들러를 반환합니다
	handler := router.AddHandler(
		"struct_handler",          // 핸들러 이름, 고유해야 함
		"incoming_messages_topic", // 이벤트가 읽힐 토픽
		pubSub,
		"outgoing_messages_topic", // 이벤트가 발행될 토픽
		pubSub,
		structHandler{}.Handler,
	)

	// 핸들러 수준의 미들웨어는 특정 핸들러에 대해서만 실행됩니다
	// 이러한 미들웨어는 라우터 수준의 미들웨어와 동일한 방식으로 핸들러에 추가할 수 있습니다
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("메시지", message.UUID, "에 대한 핸들러별 미들웨어 실행 중")

			return h(message)
		}
	})

	// 디버깅 목적으로 인해 `incoming_messages_topic`에 수신된 모든 메시지를 출력합니다
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// 디버깅 목적으로 인해 `outgoing_messages_topic`으로 전송된 모든 이벤트를 출력합니다
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// 이제 모든 핸들러가 등록되었으므로 라우터를 실행할 수 있습니다.
	// 라우터가 실행 중일 때까지 Run은 블록됩니다.
// ...

사용 가능한 미들웨어

Watermill에서 제공하는 재사용 가능한 미들웨어들이 있으며, 직접 미들웨어를 구현할 수도 있습니다. 예를 들어, 특정 유형의 로그 형식에 수신된 각 메시지를 저장하려는 경우 이것이 가장 좋은 방법입니다.

서킷 브레이커

// CircuitBreaker는 핸들러를 서킷 브레이커로 감싸는 미들웨어입니다.
// 구성에 따라 핸들러가 계속 오류를 반환하면 서킷 브레이커는 빠르게 실패합니다.
// 이는 연쇄적인 실패를 방지하는 데 유용합니다.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker는 새로운 CircuitBreaker 미들웨어를 반환합니다.
// 사용 가능한 설정에 대해서는 gobreaker 문서를 참조하십시오.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware는 CircuitBreaker 미들웨어를 반환합니다.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

상관관계

// SetCorrelationID는 메시지의 상관 ID를 설정합니다.
//
// 메시지가 시스템에 들어오면 SetCorrelationID를 호출해야 합니다.
// 요청(예: HTTP)에서 생성된 메시지의 상관 ID는 요청의 상관 ID와 동일해야 합니다.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID는 메시지에서 상관 ID를 반환합니다.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID는 핸들러에 의해 생성된 모든 메시지에 상관 ID를 추가합니다.
// ID는 핸들러가 받은 메시지 ID를 기반으로 합니다.
//
// CorrelationID가 올바르게 작동하려면 메시지가 시스템에 들어오기 전에 먼저 SetCorrelationID를 호출해야 합니다.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

복제본

// Duplicator는 엔드포인트가 동일성을 보장하기 위해 메시지를 두 번 처리합니다.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

오류 무시

// IgnoreErrors는 핸들러가 특정 명시적으로 정의된 오류를 무시할 수 있는 미들웨어를 제공합니다.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors는 새로운 IgnoreErrors 미들웨어를 생성합니다.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware는 IgnoreErrors 미들웨어를 반환합니다.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

즉시 응답

// InstantAck는 핸들러가 발생한 오류에 상관없이 즉시 들어오는 메시지를 승인합니다.
// 처리량을 향상시키기 위해 사용할 수 있지만,
// 보장되는 전달이 필요한 경우 적어도 한 번 전달을 받을 수 있습니다.
// 순서가 필요한 메시지인 경우 순서가 깨질 수 있습니다.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

// PoisonQueue는 처리할 수 없는 메시지를 처리하고 별도의 주제에 발행하는 미들웨어 기능을 제공합니다.
// 그런 다음 본 미들웨어 체인은 계속 실행되고 비즈니스가 계속 진행됩니다.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter는 PoisonQueue와 유사하지만 독 큐 기준을 충족하는 오류를 결정하는 함수를 허용합니다.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

랜덤 실패

// RandomFail은 핸들러를 무작위 확률에 따라 실패하도록합니다. 오류 확률은 범위 (0, 1) 내에 있어야합니다.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("랜덤 오류가 발생했습니다")
			}
			return h(message)
		}
	}
}

// RandomPanic은 핸들러가 무작위 확률에 따라 패닉하도록합니다. 패닉 확률은 범위 (0, 1) 내에 있어야합니다.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("랜덤 패닉이 발생했습니다")
			}
			return h(message)
		}
	}
}

회복기

// RecoveredPanicError는 회수된 패닉의 오류와 해당 스택 추적 정보를 유지합니다.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer는 핸들러에서 발생하는 모든 패닉을 복구하고 스택 추적과 함께 RecoveredPanicError를 반환합니다.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

재시도

// Retry는 오류가 반환되면 핸들러를 다시 시도하는 미들웨어를 제공합니다.
// 재시도 동작, 지수 백오프, 및 최대 경과 시간을 구성할 수 있습니다.
type Retry struct {
	// MaxRetries는 시도할 최대 횟수입니다.
	MaxRetries int

	// InitialInterval은 재시도 사이의 초기 간격입니다. 후속 간격은 Multiplier에 의해 확장됩니다.
	InitialInterval time.Duration
	// MaxInterval은 재시도의 지수 백오프 상한을 설정합니다.
	MaxInterval time.Duration
	// Multiplier는 재시도 사이의 대기 간격이 곱해질 요소입니다.
	Multiplier float64
	// MaxElapsedTime은 재시도의 최대 시간 제한을 설정합니다. 0이면 비활성화됩니다.
	MaxElapsedTime time.Duration
	// RandomizationFactor는 백오프 시간을 다음 범위 내에서 무작위로 분산합니다:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook은 각 재시도 시도에서 실행할 선택적 함수입니다.
	// 현재 재시도 번호가 retryNum을 통해 전달됩니다.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware은 Retry 미들웨어를 반환합니다.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

스로틀

// Throttle은 특정 시간 내에 처리되는 메시지 수를 제한하는 미들웨어를 제공합니다.
// 이를 사용하여 처리되지 않은 긴 대기열에서 실행되는 핸들러의 과부하를 방지할 수 있습니다.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle은 새로운 Throttle 미들웨어를 생성합니다.
// 예시 시간과 카운트: NewThrottle(10, time.Second)는 1초에 10개의 메시지를 나타냅니다.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware은 Throttle 미들웨어를 반환합니다.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// 여러 핸들러가 공유하는 Throttle은 "틱"을 기다립니다.

시간 초과

// Timeout은 지정된 기간 후에 들어오는 메시지의 컨텍스트를 취소합니다.
// 핸들러의 시간 초과에 민감한 기능은 msg.Context().Done()을 수신하여 실패 시기를 알아야 합니다.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}