Publisher i Subscriber są niższymi częściami Watermill. W praktycznych zastosowaniach zazwyczaj chcesz używać interfejsów i funkcji o wysokim poziomie, takich jak asocjacje, metryki, kolejki zatrutych wiadomości, próby ponowienia, ograniczenia szybkości, itp.

Czasami nie chcesz wysyłać potwierdzenia (Ack), gdy przetwarzanie jest udane. Innym razem chcesz wysłać wiadomość po przetworzeniu innej wiadomości.

Aby sprostać tym wymaganiom, istnieje komponent o nazwie Router.

Watermill Router

Konfiguracja

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout określa, jak długo router powinien pracować dla handlerów podczas zamykania.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate sprawdza, czy istnieją jakiekolwiek błędy w konfiguracji routera.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Obsługa

Najpierw musisz zaimplementować funkcję HandlerFunc:

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc to funkcja wywoływana po otrzymaniu wiadomości.
// 
// Gdy HandlerFunc nie zwraca błędu, wywoływane będzie automatycznie msg.Ack().
// 
// Gdy HandlerFunc zwraca błąd, wywoływane będzie msg.Nack().
// 
// Gdy msg.Ack() jest wywoływane w obsłudze i HandlerFunc zwraca błąd,
// msg.Nack() nie zostanie wysłane, ponieważ Ack został już wysłany.
// 
// Przy otrzymywaniu wielu wiadomości (ze względu na wysłanie msg.Ack() w HandlerFunc lub subskrypcję obsługującą wielu odbiorców),
// HandlerFunc zostanie wykonane równolegle.
typ HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Następnie musisz użyć Router.AddHandler, aby dodać nowego obsługującego:

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler dodaje nowego obsługującego.

// Nazwa obsługującego musi być unikalna. Obecnie jest używana tylko do debugowania.

// subscribeTopic to temat, z którego obsługujący otrzymuje wiadomości.

// publishTopic to temat, z którego wiadomości zwrócone przez obsługującego będą generowane przez Router.

// Gdy obsługujący potrzebuje publikować do wielu tematów,

// zaleca się jedynie wstrzyknięcie wydawcy do obsługującego lub implementację oprogramowania pośredniczącego,

// które może przechwycić wiadomości na podstawie metadanych i publikować do konkretnych tematów.

// Jeśli obsługujący jest dodawany, gdy router jest już uruchomiony, należy wywołać RunHandlers() explicite.
func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Dodanie obsługującego", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler dodaje nowego obsługującego.

// Ten obsługujący nie może zwracać wiadomości.

// Gdy zwraca wiadomość, wystąpi błąd i zostanie wysłany Nack.

//

// Nazwa obsługującego musi być unikalna. Obecnie jest używana tylko do debugowania.

// subscribeTopic to temat, z którego obsługujący otrzymuje wiadomości.

// subscriber to subskrybent używany do konsumowania wiadomości.

// Jeśli obsługujący jest dodawany, gdy router jest już uruchomiony, należy wywołać RunHandlers() explicite.
func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Odniesienie do przykładowego użycia znajduje się w sekcji "Rozpoczęcie". Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler zwraca obsługę, która może być użyta do dodania poziomu pośrednictwa obsługi lub zatrzymania obsłużycieli.
	obsługa := router.AddHandler(
		"struct_handler",          // nazwa obsługi, musi być unikalna
		"incoming_messages_topic", // temat, z którego są odczytywane zdarzenia
		pubSub,
		"outgoing_messages_topic", // temat do publikowania zdarzeń
		pubSub,
		structHandler{}.Handler,
	)

	// Poziom pośrednictwa obsługi jest wykonywany tylko dla określonych obsług
	// To pośrednictwo można dodać w ten sam sposób, co pośrednictwo na poziomie routera.
	obsługa.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Wykonywanie pośrednictwa specyficznego dla obsługi, identyfikator UUID wiadomości: ", message.UUID)

			return h(message)
		}
	})
// ...

Brak obsługi wydawcy

Nie każdy manipulator wygeneruje nową wiadomość. Możesz użyć Router.AddNoPublisherHandler aby dodać ten typ manipulatora:

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler dodaje nowy manipulator.
// Ten manipulator nie może zwrócić wiadomości.
// Gdy zwraca wiadomość, wystąpi błąd, a zostanie wysłany Nack.
//
// handlerName musi być unikalny i aktualnie używany tylko do celów debugowania.
//
// subscribeTopic to temat, na którym manipulator odbierze wiadomości.
//
// subscriber jest używany do konsumowania wiadomości.
//
// Jeśli dodajesz manipulator do routera, który już działa, musisz wywołać RunHandlers() explicite.
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

Potwierdzenie

Domyślnie, gdy HanderFunc nie zwraca błędu, zostanie wywołane msg.Ack(). Jeśli zwracany jest błąd, zostanie wywołane msg.Nack(). Po obsłużeniu wiadomości, nie musisz wywoływać msg.Ack() ani msg.Nack (oczywiście możesz, jeśli chcesz).

Wysyłanie wiadomości

Gdy manipulator zwraca wiele wiadomości, należy pamiętać, że większość implementacji Publishera nie obsługuje atomowego publikowania wiadomości. Jeśli broker lub magazyn są niedostępne, mogą zostać wygenerowane tylko niektóre wiadomości, a zostanie wysłane msg.Nack().

Jeśli jest to problem, rozważ, czy każdy manipulator powinien publikować tylko jedną wiadomość.

Uruchamianie Routera

Aby uruchomić routera, należy wywołać Run().

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run uruchamia wszystkie wtyczki i manipulatory oraz rozpoczyna subskrypcję podanych tematów.
// To wywołanie blokuje działanie routera.
//
// Gdy wszystkie manipulatory się zatrzymają (na przykład, gdy subskrypcja zostanie zamknięta), router również się zatrzyma.
//
// Aby zatrzymać Run(), należy wywołać Close() na routerze.
//
// ctx będzie propagowane do wszystkich subskrybentów.
//
// Gdy wszystkie manipulatory się zatrzymają (np. z powodu zamkniętych połączeń), Run() również się zatrzyma.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

Upewnienie się, że Router działa

Przydatne może być zrozumienie, czy router działa. Można to osiągnąć, korzystając z metody Running().

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running zamyka się, gdy router działa.
// Innymi słowy, można czekać aż router się uruchomi przy użyciu:

// 	fmt.Println("Uruchamianie routera")
//	go r.Run(ctx)
//	//	fmt.Println("Router działa")

// Ostrzeżenie: Z historycznych powodów ten kanał nie wie o zatrzymaniu routera - zamknie się, jeśli router będzie działał, a następnie zostanie zatrzymany.
func (r *Router) Running() chan struct{} {
// ...
}

Można również użyć funkcji IsRunning, która zwraca wartość logiczną:

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning zwraca true, gdy router działa.
//
// Ostrzeżenie: Z historycznych powodów ta metoda nie wie o zamkniętym stanie routera.
// Jeśli chcesz wiedzieć, czy router został zamknięty, skorzystaj z IsClosed.
func (r *Router) IsRunning() bool {
// ...
}

Wyłączenie routera

Aby wyłączyć router, musisz wywołać Close().

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() wyłączy wszystkich wydawców i subskrybentów oraz poczeka, aż wszystkie obsługiwane zostaną zakończone.

Close() poczeka na upływ timeoutu ustawionego w RouterConfig.CloseTimeout w konfiguracji. Jeśli timeout zostanie osiągnięty, Close() zwróci błąd.

Dodawanie obsługi po uruchomieniu routera

Możesz dodać nową obsługę, gdy router jest już uruchomiony. Aby to zrobić, musisz wywołać AddNoPublisherHandler lub AddHandler, a następnie wywołać RunHandlers.

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Zatrzymywanie działających obsług

Możesz zatrzymać tylko jedną działającą obsługę, wywołując Stop().

Należy pamiętać, że router zostanie wyłączony, gdy nie będzie żadnych działających obsług.

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...

Model wykonania

Subskrybenci mogą pobierać pojedynczą wiadomość sekwencyjnie lub wiele wiadomości równolegle.

  • Pojedynczy przepływ wiadomości jest najprostszą metodą, co oznacza, że subskrybenci nie otrzymają żadnych nowych wiadomości, dopóki nie zostanie wywołane msg.Ack().
  • Wielokrotny przepływ wiadomości jest obsługiwany tylko przez określonych subskrybentów. Poprzez subskrybowanie wielu partycji tematów jednocześnie, możliwe jest pobieranie wielu wiadomości równolegle, nawet wiadomości, które wcześniej nie zostały potwierdzone (np. jak działa subskrybent Kafka). Router przetwarza ten model, uruchamiając HandlerFunc równolegle.

Proszę odnieść się do wybranej dokumentacji Pub/Sub, aby zrozumieć obsługiwane modele wykonania.

Middleware

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware allows us to write something similar to a decorator for HandlerFunc.
// It can execute some operations before (e.g., modify the consumed message) or after the handler (modify the generated message, ack/nack the consumed message, handle errors, log, etc.).
//
// It can be attached to the router using the `AddMiddleware` method.
//
// Example:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("executed before handler")
// 			producedMessages, err := h(message)
// 			fmt.Println("executed after handler")
//
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

Pełną listę standardowych middleware można znaleźć w Middlewares.

Wtyczki

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin is a function executed when the router starts.
type RouterPlugin func(*Router) error

// ...

Pełną listę standardowych wtyczek można znaleźć w message/router/plugin.

Kontekst

Niektóre przydatne wartości są przechowywane w kontekście dla każdej wiadomości odebranej przez obsługującego:

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx zwraca nazwę obsługującego wiadomość w routerze, który odebrał wiadomość z kontekstu.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx zwraca nazwę rodzaju wydawcy wiadomości w routerze z kontekstu.
// Na przykład, dla Kafki, będzie to `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx zwraca nazwę rodzaju subskrybenta wiadomości w routerze z kontekstu.
// Na przykład, dla Kafki, będzie to `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx zwraca temat, z którego wiadomość została odebrana w routerze z kontekstu.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx zwraca temat, do którego wiadomość zostanie opublikowana w routerze z kontekstu.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...