소개
미들웨어는 이벤트 프레임워크를 확장하고 사용자 정의 기능을 추가하며, 주요 핸들러의 논리와 관련이 없는 중요한 기능을 제공하는 데 사용됩니다. 예를 들어, 핸들러가 오류를 반환한 후에 재시도하거나, 패닉 상태에서 회복하고 핸들러 내에서 스택 트레이스를 캡처하는 등의 기능을 수행할 수 있습니다.
미들웨어 함수 시그니처는 다음과 같이 정의됩니다:
전체 소스 코드: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware를 사용하면 핸들러와 유사한 데코레이터를 작성할 수 있습니다.
// 핸들러 이전에 일부 작업을 수행할 수 있으며(예: 수신된 메시지 수정), 핸들러 이후에도 일부 작업을 수행할 수 있습니다(생성된 메시지 수정, 수신된 메시지에 대한 ACK/NACK, 오류 처리, 로깅 등).
//
// 이것은 `AddMiddleware` 메서드를 사용하여 라우터에 추가할 수 있습니다.
//
// 예:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("핸들러 실행 전")
// 생성된메시지, 오류 := h(message)
// fmt.Println("핸들러 실행 후")
//
// return 생성된메시지, 오류
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
사용법
미들웨어는 라우터의 모든 핸들러에 적용하거나 특정 핸들러에 적용할 수 있습니다. 미들웨어가 라우터에 직접 추가되면 해당 라우터의 모든 핸들러에 적용됩니다. 미들웨어가 특정 핸들러에만 적용되는 경우, 해당 핸들러에 라우터에 추가해야 합니다.
다음은 사용 예입니다:
전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SIGTERM 신호를 수신할 때 SignalsHandler가 라우터를 안전하게 닫습니다.
// `r.Close()`를 호출하여 라우터를 닫을 수도 있습니다.
router.AddPlugin(plugin.SignalsHandler)
// 라우터 수준의 미들웨어는 라우터로 전송된 모든 메시지에 대해 실행됩니다
router.AddMiddleware(
// CorrelationID는 수신한 메시지 메타데이터에서 상관관계 ID를 생성된 메시지에 복사합니다
middleware.CorrelationID,
// 핸들러가 오류를 반환하면 재시도됩니다.
// 최대 MaxRetries 번까지 재시도되며, 그 이후에는 메시지가 Nacked되어 PubSub에 의해 다시 전송됩니다.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer는 핸들러에서 발생한 패닉을 처리합니다.
// 이 경우에는 Retry 미들웨어에 오류로 전달합니다.
middleware.Recoverer,
)
// 간단하게 gochannel Pub/Sub을 사용하며,
// 다른 Pub/Sub 구현으로 대체할 수 있으며 동일하게 작동합니다.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// 백그라운드에서 일부 수신 메시지를 발행합니다
go publishMessages(pubSub)
// AddHandler는 핸들러 수준의 미들웨어를 추가하거나 핸들러를 중지하는 데 사용할 수 있는 핸들러를 반환합니다
handler := router.AddHandler(
"struct_handler", // 핸들러 이름, 고유해야 함
"incoming_messages_topic", // 이벤트가 읽힐 토픽
pubSub,
"outgoing_messages_topic", // 이벤트가 발행될 토픽
pubSub,
structHandler{}.Handler,
)
// 핸들러 수준의 미들웨어는 특정 핸들러에 대해서만 실행됩니다
// 이러한 미들웨어는 라우터 수준의 미들웨어와 동일한 방식으로 핸들러에 추가할 수 있습니다
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("메시지", message.UUID, "에 대한 핸들러별 미들웨어 실행 중")
return h(message)
}
})
// 디버깅 목적으로 인해 `incoming_messages_topic`에 수신된 모든 메시지를 출력합니다
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// 디버깅 목적으로 인해 `outgoing_messages_topic`으로 전송된 모든 이벤트를 출력합니다
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// 이제 모든 핸들러가 등록되었으므로 라우터를 실행할 수 있습니다.
// 라우터가 실행 중일 때까지 Run은 블록됩니다.
// ...
사용 가능한 미들웨어
Watermill에서 제공하는 재사용 가능한 미들웨어들이 있으며, 직접 미들웨어를 구현할 수도 있습니다. 예를 들어, 특정 유형의 로그 형식에 수신된 각 메시지를 저장하려는 경우 이것이 가장 좋은 방법입니다.
서킷 브레이커
// CircuitBreaker는 핸들러를 서킷 브레이커로 감싸는 미들웨어입니다.
// 구성에 따라 핸들러가 계속 오류를 반환하면 서킷 브레이커는 빠르게 실패합니다.
// 이는 연쇄적인 실패를 방지하는 데 유용합니다.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker는 새로운 CircuitBreaker 미들웨어를 반환합니다.
// 사용 가능한 설정에 대해서는 gobreaker 문서를 참조하십시오.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware는 CircuitBreaker 미들웨어를 반환합니다.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
상관관계
// SetCorrelationID는 메시지의 상관 ID를 설정합니다.
//
// 메시지가 시스템에 들어오면 SetCorrelationID를 호출해야 합니다.
// 요청(예: HTTP)에서 생성된 메시지의 상관 ID는 요청의 상관 ID와 동일해야 합니다.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID는 메시지에서 상관 ID를 반환합니다.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID는 핸들러에 의해 생성된 모든 메시지에 상관 ID를 추가합니다.
// ID는 핸들러가 받은 메시지 ID를 기반으로 합니다.
//
// CorrelationID가 올바르게 작동하려면 메시지가 시스템에 들어오기 전에 먼저 SetCorrelationID를 호출해야 합니다.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
복제본
// Duplicator는 엔드포인트가 동일성을 보장하기 위해 메시지를 두 번 처리합니다.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
오류 무시
// IgnoreErrors는 핸들러가 특정 명시적으로 정의된 오류를 무시할 수 있는 미들웨어를 제공합니다.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors는 새로운 IgnoreErrors 미들웨어를 생성합니다.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware는 IgnoreErrors 미들웨어를 반환합니다.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
즉시 응답
// InstantAck는 핸들러가 발생한 오류에 상관없이 즉시 들어오는 메시지를 승인합니다.
// 처리량을 향상시키기 위해 사용할 수 있지만,
// 보장되는 전달이 필요한 경우 적어도 한 번 전달을 받을 수 있습니다.
// 순서가 필요한 메시지인 경우 순서가 깨질 수 있습니다.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
독
// PoisonQueue는 처리할 수 없는 메시지를 처리하고 별도의 주제에 발행하는 미들웨어 기능을 제공합니다.
// 그런 다음 본 미들웨어 체인은 계속 실행되고 비즈니스가 계속 진행됩니다.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter는 PoisonQueue와 유사하지만 독 큐 기준을 충족하는 오류를 결정하는 함수를 허용합니다.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
랜덤 실패
// RandomFail은 핸들러를 무작위 확률에 따라 실패하도록합니다. 오류 확률은 범위 (0, 1) 내에 있어야합니다.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("랜덤 오류가 발생했습니다")
}
return h(message)
}
}
}
// RandomPanic은 핸들러가 무작위 확률에 따라 패닉하도록합니다. 패닉 확률은 범위 (0, 1) 내에 있어야합니다.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("랜덤 패닉이 발생했습니다")
}
return h(message)
}
}
}
회복기
// RecoveredPanicError는 회수된 패닉의 오류와 해당 스택 추적 정보를 유지합니다.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer는 핸들러에서 발생하는 모든 패닉을 복구하고 스택 추적과 함께 RecoveredPanicError를 반환합니다.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
재시도
// Retry는 오류가 반환되면 핸들러를 다시 시도하는 미들웨어를 제공합니다.
// 재시도 동작, 지수 백오프, 및 최대 경과 시간을 구성할 수 있습니다.
type Retry struct {
// MaxRetries는 시도할 최대 횟수입니다.
MaxRetries int
// InitialInterval은 재시도 사이의 초기 간격입니다. 후속 간격은 Multiplier에 의해 확장됩니다.
InitialInterval time.Duration
// MaxInterval은 재시도의 지수 백오프 상한을 설정합니다.
MaxInterval time.Duration
// Multiplier는 재시도 사이의 대기 간격이 곱해질 요소입니다.
Multiplier float64
// MaxElapsedTime은 재시도의 최대 시간 제한을 설정합니다. 0이면 비활성화됩니다.
MaxElapsedTime time.Duration
// RandomizationFactor는 백오프 시간을 다음 범위 내에서 무작위로 분산합니다:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook은 각 재시도 시도에서 실행할 선택적 함수입니다.
// 현재 재시도 번호가 retryNum을 통해 전달됩니다.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware은 Retry 미들웨어를 반환합니다.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
스로틀
// Throttle은 특정 시간 내에 처리되는 메시지 수를 제한하는 미들웨어를 제공합니다.
// 이를 사용하여 처리되지 않은 긴 대기열에서 실행되는 핸들러의 과부하를 방지할 수 있습니다.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle은 새로운 Throttle 미들웨어를 생성합니다.
// 예시 시간과 카운트: NewThrottle(10, time.Second)는 1초에 10개의 메시지를 나타냅니다.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware은 Throttle 미들웨어를 반환합니다.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// 여러 핸들러가 공유하는 Throttle은 "틱"을 기다립니다.
시간 초과
// Timeout은 지정된 기간 후에 들어오는 메시지의 컨텍스트를 취소합니다.
// 핸들러의 시간 초과에 민감한 기능은 msg.Context().Done()을 수신하여 실패 시기를 알아야 합니다.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}