설치

go get -u github.com/ThreeDotsLabs/watermill

준비

이벤트 기반 애플리케이션의 기본 아이디어는 항상 같습니다: 수신된 메시지를 수신하고 해당에 대응하는 것입니다. Watermill은 이러한 동작을 여러 발행자와 구독자에 대해 구현하는 것을 지원합니다.

Watermill의 핵심 부분은 Message인데, 이는 http 패키지 내의 http.Request만큼 중요합니다. 대부분의 Watermill 기능은 어떤 식으로든 이 구조체를 사용합니다.

PubSub 라이브러리가 제공하는 복잡한 기능에도 불구하고, Watermill에서는 사용을 시작하기 위해 PubSub 라이브러리의 PublisherSubscriber 인터페이스 2개만 구현하는 것이 필요합니다.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

메시지 구독

구독하는 것부터 시작해 보겠습니다. Subscribe는 토픽 이름을 기대하고 수신된 메시지를 받기 위한 채널을 반환합니다. 토픽의 구체적인 의미는 PubSub 구현에 따라 다릅니다.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

지원되는 PubSub의 자세한 예제는 다음 내용을 참조해 주세요.

Go 채널 예제

완전한 예제 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

완전한 예제 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

	// 메시지를 받아서 처리했음을 확인해야 합니다. 그렇지 않으면 여러 번 재전달될 수 있습니다.
	msg.Ack()
}
}

Kafka 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// auto.offset.reset: earliest에 해당함
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("메시지를 받음: %s, 페이로드: %s", msg.UUID, string(msg.Payload))

		// 받고 처리한 메시지를 확인해야 합니다.
		// 그렇지 않으면 메시지가 반복해서 재전송됩니다.
		msg.Ack()
	}
}

RabbitMQ (AMQP) 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// 이 구성은 다음 예제를 기반으로 합니다: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// 이것은 단순한 큐로 사용됩니다.
		//
		// Pub/Sub 스타일 서비스를 구현하려면 다음을 참조하세요.
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("메시지를 받음: %s, 페이로드: %s", msg.UUID, string(msg.Payload))

		// 메시지를 수신 및 처리했음을 확인해야 합니다.
		// 그렇지 않으면 메시지가 반복해서 재전달됩니다.
		msg.Ack()
	}
}

SQL 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/Vmain.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("메시지 수신: %s, 페이로드: %s", msg.UUID, string(msg.Payload))

		// 메시지가 수신되고 처리되었음을 확인해야 합니다. 그렇지 않으면 메시지가 반복적으로 전달됩니다.
		msg.Ack()
	}
}

메시지 생성

Watermill은 메시지 형식을 강제하지 않습니다. NewMessage는 페이로드가 바이트 슬라이스여야 합니다. 문자열, JSON, protobuf, Avro, gob 또는 []byte로 직렬화할 수 있는 기타 형식을 사용할 수 있습니다.

메시지 UUID는 선택 사항이지만 디버깅에 도움이 되므로 권장됩니다.

msg := message.NewMessage(watermill.NewUUID(), []byte("안녕, 세계!"))

메시지 발행

Publish 메서드는 토픽과 하나 이상의 메시지를 발행해야 합니다.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Go 채널 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("안녕, 세계!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Kafka 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("안녕, 세계!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

SQL 예시

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

메시지 라우터 사용하기

발행자 및 구독자는 Watermill의 하위 수준 구성 요소입니다. 대부분의 경우 상관 관계, 지표, 독성 큐, 재시도 및 속도 제한과 같은 상위 수준의 인터페이스 및 기능을 사용하고 싶을 것입니다.

메시지가 성공적으로 처리된 후에만 메시지를 승인하고 싶을 수도 있습니다. 다른 경우에는 즉시 메시지를 승인하고 처리를 고려하고 싶을 수도 있습니다. 때로는 수신된 메시지를 기반으로 특정 작업을 수행하고 응답으로 다른 메시지를 게시하고 싶을 수도 있습니다.

이러한 요구 사항을 충족시키기 위해 라우터라는 구성 요소가 있습니다.

메시지 라우터의 예시 응용 프로그램

예시 응용 프로그램의 흐름은 다음과 같습니다:

  1. 매 초마다 incoming_messages_topic에 메시지를 생성합니다.
  2. struct_handler 리스너는 incoming_messages_topic을 처리합니다. 메시지를 받으면 UUID를 인쇄하고 outgoing_messages_topic에 새 메시지를 생성합니다.
  3. print_incoming_messages 핸들러는 incoming_messages_topic에서 청취하고 메시지의 UUID, 페이로드 및 메타데이터를 인쇄합니다.
  4. print_outgoing_messages 핸들러는 outgoing_messages_topic에서 청취하고 메시지의 UUID, 페이로드 및 메타데이터를 인쇄합니다. 상관 ID는 incoming_messages_topic의 메시지와 동일해야 합니다.

라우터 구성

먼저 플러그인 및 미들웨어를 추가하여 라우터를 구성합니다. 그런 다음, 라우터가 사용할 핸들러를 설정합니다. 각 핸들러는 메시지를 독립적으로 처리합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// 이 예제에서는 간단한 로거 구현을 사용합니다.
	// 사용자 고유의 `watermill.LoggerAdapter` 구현을 제공할 수 있습니다.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM 신호를 받으면, SignalsHandler가 라우터를 정상적으로 닫습니다.
	// `r.Close()`를 호출하여 라우터를 직접 닫을 수도 있습니다.
	router.AddPlugin(plugin.SignalsHandler)

	// 라우터 수준의 미들웨어는 라우터로 전송된 모든 메시지에 대해 실행됩니다.
	router.AddMiddleware(
		// CorrelationID는 수신된 메시지의 메타데이터에서 상관 ID를 생성된 메시지에 복사합니다.
		middleware.CorrelationID,

		// 핸들러가 오류를 반환하면 핸들러 함수를 다시 시도합니다.
		// MaxRetries에 도달하면 메시지가 Nacked되고 PubSub이 다시 전송하는 책임이 있습니다.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer는 핸들러 내의 패닉을 처리합니다.
		// 이 경우에는 Retry 미들웨어에 에러로 전달합니다.
		middleware.Recoverer,
	)

	// 간단히 하기 위해 gochannel Pub/Sub을 사용합니다.
	// 어떤 Pub/Sub 구현으로 대체해도 효과는 동일합니다.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// 백그라운드에서 일부 입력 메시지를 생성합니다.
	go publishMessages(pubSub)

	// AddHandler는 핸들러 수준의 미들웨어를 추가하거나 핸들러를 중지할 수 있는 핸들러를 반환합니다.
	// 이 예에서는 structHandler 객체의 핸들러를 사용합니다.
	handler := router.AddHandler(
		"struct_handler",          // 핸들러 이름, 고유해야 합니다.
		"incoming_messages_topic", // 이벤트를 읽는 토픽
		pubSub, 
		"outgoing_messages_topic", // 이벤트를 발행하는 토픽
		pubSub,
		structHandler{}.Handler,
	)

	// 핸들러 수준의 미들웨어는 특정 핸들러에 대해서만 실행됩니다.
	// 이러한 종류의 미들웨어는 라우터 수준의 미들웨어와 동일한 방법으로 추가할 수 있습니다.
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("메시지", message.UUID, "의 핸들러별 미들웨어 실행 중")

			return h(message)
		}
	})

	// 디버깅을 위해 "incoming_messages_topic"에서 수신된 모든 메시지를 출력합니다.
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// 디버깅을 위해 "outgoing_messages_topic"로 전송된 모든 이벤트를 출력합니다.
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// 모든 핸들러가 등록된 상태이므로, 라우터를 실행합니다.
	// 실행 중에는 라우터가 블록됩니다.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

수신 메시지

struct_handlerincoming_messages_topic에서 메시지를 소비하기 때문에 우리는 백그라운드에서 publishMessages()를 호출하여 들어오는 트래픽을 시뮬레이션합니다. 참고로 SetCorrelationID 미들웨어를 추가했습니다. 라우터는 모든 생성된 메시지에 상관 ID를 추가할 것입니다(메타데이터에 저장됨).

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("안녕, 세상!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("메시지 %s 전송, 상관 ID: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

핸들러

두 가지 유형의 핸들러 함수를 발견할 수 있습니다:

  1. 함수func(msg *message.Message) ([]*message.Message, error)
  2. 메소드func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

핸들러가 어떤 종속성에 의존하지 않는 함수인 경우에는 첫 번째 옵션을 사용하는 것이 좋습니다. 핸들러가 데이터베이스 핸들, 로거 등의 일부 종속성이 필요한 경우 두 번째 옵션이 유용합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> 메시지 수신: %s\n> %s\n> 메타데이터: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // 여기에 일부 종속성을 추가할 수 있습니다
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler가 메시지를 받음", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler에 의해 생성된 메시지"))
    return message.Messages{msg}, nil
}

완료!

이 예제는 go run main.go로 실행할 수 있습니다.