Publisher und Subscriber sind die untergeordneten Teile von Watermill. In praktischen Anwendungen möchten Sie in der Regel hochrangige Schnittstellen und Funktionen verwenden, wie beispielsweise Verbindungen, Metriken, Warteschlangen für fehlerhafte Nachrichten, Wiederholungen, Begrenzung der Rate usw.
Manchmal möchten Sie vielleicht keine Bestätigung senden, wenn die Verarbeitung erfolgreich ist. Manchmal möchten Sie eine Nachricht senden, nachdem eine andere Nachricht verarbeitet wurde.
Um diese Anforderungen zu erfüllen, gibt es eine Komponente namens Router.
Konfiguration
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout bestimmt, wie lange der Router für Handler arbeiten soll, wenn er geschlossen wird.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate überprüft, ob es Fehler in der Router-Konfiguration gibt.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Handler
Zunächst müssen Sie die Funktion HandlerFunc
implementieren:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc ist die Funktion, die aufgerufen wird, wenn eine Nachricht empfangen wird.
//
// Wenn HandlerFunc keinen Fehler zurückgibt, wird msg.Ack() automatisch aufgerufen.
//
// Wenn HandlerFunc einen Fehler zurückgibt, wird msg.Nack() aufgerufen.
//
// Wenn msg.Ack() im Handler aufgerufen wird und HandlerFunc einen Fehler zurückgibt,
// wird msg.Nack() nicht gesendet, da Ack bereits gesendet wurde.
//
// Beim Empfangen mehrerer Nachrichten (aufgrund von msg.Ack() im Handler oder Subscriber, der mehrere Verbraucher unterstützt),
// wird HandlerFunc gleichzeitig ausgeführt.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Als nächstes müssen Sie Router.AddHandler
verwenden, um einen neuen Handler hinzuzufügen:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler fügt einen neuen Handler hinzu.
// handlerName muss eindeutig sein. Derzeit wird es nur für das Debugging verwendet.
// subscribeTopic ist das Thema, von dem der Handler Nachrichten empfangen wird.
// publishTopic ist das Thema, von dem die zurückgegebenen Nachrichten des Handlers vom Router generiert werden.
// Wenn der Handler auf mehrere Themen veröffentlichen muss,
// wird empfohlen, nur den Publisher dem Handler hinzuzufügen oder Middleware zu implementieren,
// die Nachrichten auf der Grundlage von Metadaten erfassen und an bestimmte Themen veröffentlichen kann.
// Wenn während des Betriebs des Routers ein Handler hinzugefügt wird, muss RunHandlers() explizit aufgerufen werden.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Handler hinzufügen", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped wartet nicht immer auf handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler fügt einen neuen Handler hinzu.
// Dieser Handler kann keine Nachrichten zurückgeben.
// Wenn er eine Nachricht zurückgibt, tritt ein Fehler auf und ein Nack wird gesendet.
//
// handlerName muss eindeutig sein. Derzeit wird es nur für das Debugging verwendet.
// subscribeTopic ist das Thema, von dem der Handler Nachrichten empfangen wird.
// subscriber ist ein Subscriber, der zum Konsumieren von Nachrichten verwendet wird.
// Wenn während des Betriebs des Routers ein Handler hinzugefügt wird, muss RunHandlers() explizit aufgerufen werden.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
Sehen Sie sich das Beispiel in "Getting Started" an. Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler gibt einen Handler zurück, der verwendet werden kann, um Handler-Level-Middleware hinzuzufügen oder Handler zu stoppen.
handler := router.AddHandler(
"struct_handler", // Handlername, muss eindeutig sein
"incoming_messages_topic", // Thema, aus dem Ereignisse gelesen werden
pubSub,
"outgoing_messages_topic", // Thema zum Veröffentlichen von Ereignissen
pubSub,
structHandler{}.Handler,
)
// Handler-Level-Middleware wird nur für bestimmte Handler ausgeführt
// Diese Middleware kann auf die gleiche Weise wie Router-Level-Middleware hinzugefügt werden
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Ausführung von handler-spezifischer Middleware, Nachrichten UUID: ", message.UUID)
return h(message)
}
})
// ...
Kein Publisher-Handler
Nicht jeder Handler generiert eine neue Nachricht. Sie können Router.AddNoPublisherHandler
verwenden, um diese Art von Handler hinzuzufügen:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler fügt einen neuen Handler hinzu.
// Dieser Handler kann keine Nachrichten zurückgeben.
// Wenn er eine Nachricht zurückgibt, tritt ein Fehler auf und Nack wird gesendet.
//
// handlerName muss eindeutig sein und wird derzeit nur für Debugging-Zwecke verwendet.
//
// subscribeTopic ist das Thema, auf dem der Handler Nachrichten empfangen wird.
//
// subscriber wird zum Konsumieren von Nachrichten verwendet.
//
// Wenn Sie einen Handler zu einem bereits laufenden Router hinzufügen, müssen Sie explizit RunHandlers() aufrufen.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Bestätigung
Standardmäßig wird msg.Ack()
aufgerufen, wenn HandlerFunc
keinen Fehler zurückgibt. Wenn ein Fehler zurückgegeben wird, wird msg.Nack()
aufgerufen. Nachdem die Nachricht behandelt wurde, müssen Sie msg.Ack()
oder msg.Nack
nicht aufrufen (natürlich können Sie es tun, wenn Sie möchten).
Nachrichtenproduktion
Wenn der Handler mehrere Nachrichten zurückgibt, beachten Sie bitte, dass die meisten Publisher-Implementierungen keine atomare Veröffentlichung von Nachrichten unterstützen. Wenn der Broker oder die Speicherung nicht verfügbar ist, können nur einige Nachrichten generiert werden und msg.Nack()
wird gesendet.
Wenn dies ein Problem ist, erwägen Sie, dass jeder Handler nur eine Nachricht veröffentlicht.
Router ausführen
Um den Router auszuführen, müssen Sie Run()
aufrufen.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run führt alle Plugins und Handler aus und beginnt mit dem Abonnieren der angegebenen Themen.
// Dieser Aufruf blockiert, während der Router läuft.
//
// Wenn alle Handler stoppen (zum Beispiel, weil das Abonnement geschlossen wurde), wird auch der Router stoppen.
//
// Um Run() zu stoppen, sollten Sie Close() auf dem Router aufrufen.
//
// ctx wird an alle Abonnenten weitergeleitet.
//
// Wenn alle Handler stoppen (z.B.: aufgrund geschlossener Verbindungen), wird auch Run() stoppen.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Sicherstellen, dass der Router läuft
Es könnte nützlich sein zu verstehen, ob der Router läuft. Dies können Sie mit der Methode Running()
erreichen.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running schließt, wenn der Router läuft.
// Mit anderen Worten, Sie können auf den Lauf des Routers warten, wie folgt:
// fmt.Println("Router wird gestartet")
// go r.Run(ctx)
// // fmt.Println("Router läuft")
// Warnung: Aus historischen Gründen weiß dieses Kanal nicht über das Herunterfahren des Routers Bescheid - er wird schließen, wenn der Router weiter läuft und dann heruntergefahren wird.
func (r *Router) Running() chan struct{} {
// ...
}
Sie können auch die Funktion IsRunning
verwenden, die einen booleschen Wert zurückgibt:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning gibt true zurück, wenn der Router läuft.
//
// Warnung: Aus historischen Gründen kennt diese Methode den geschlossenen Zustand des Routers nicht.
// Wenn Sie wissen möchten, ob der Router geschlossen wurde, verwenden Sie IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
Router herunterfahren
Um den Router herunterzufahren, muss Close()
aufgerufen werden.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close schließt den Router ordnungsgemäß mit einem im Konfigurationsbereit bereitgestellten Timeout.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
schaltet alle Publisher und Subscriber aus und wartet darauf, dass alle Handler abgeschlossen sind.
Close()
wartet auf den im RouterConfig.CloseTimeout
der Konfiguration festgelegten Timeout. Wenn der Timeout erreicht ist, gibt Close()
einen Fehler zurück.
Hinzufügen von Handlern nach dem Start des Routers
Es ist möglich, einen neuen Handler hinzuzufügen, wenn der Router bereits läuft. Hierfür muss AddNoPublisherHandler
oder AddHandler
aufgerufen werden, gefolgt von RunHandlers
.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers führt alle Handler aus, die nach Run() hinzugefügt wurden.
// RunHandlers ist idempotent und kann daher mehrmals sicher aufgerufen werden.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Beenden von laufenden Handlern
Sie können nur einen laufenden Handler beenden, indem Sie Stop()
aufrufen.
Beachten Sie, dass der Router heruntergefahren wird, wenn keine laufenden Handler vorhanden sind.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop stoppt den Handler.
// Stop ist asynchron.
// Sie können über die Funktion Stopped() überprüfen, ob der Handler gestoppt wurde.
func (h *Handler) Stop() {
// ...
Ausführungsmodell
Subscriber können eine einzelne Nachricht sequenziell oder mehrere Nachrichten parallel verarbeiten.
- Der Einzelnachrichtenfluss ist die einfachste Methode, bei der Subscriber keine neuen Nachrichten erhalten, bis
msg.Ack()
aufgerufen wird. - Der Mehrerenachrichtenfluss wird nur von bestimmten Subscribern unterstützt. Durch das gleichzeitige Abonnieren mehrerer Themenpartitionen können mehrere Nachrichten parallel verarbeitet werden, sogar Nachrichten, die zuvor nicht bestätigt wurden (z. B. wie Kafka-Abonnenten arbeiten). Der Router verarbeitet dieses Modell, indem er
HandlerFunc
parallel ausführt.
Bitte konsultieren Sie die ausgewählte Pub/Sub-Dokumentation, um die unterstützten Ausführungsmodelle zu verstehen.
Middleware
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware ermöglicht es uns, etwas Ähnliches wie einen Dekorator für HandlerFunc zu schreiben.
// Damit können Operationen vor (z. B. die empfangene Nachricht modifizieren) oder nach dem Handler (die generierte Nachricht modifizieren, die empfangene Nachricht bestätigen/ablehnen, Fehler behandeln, protokollieren usw.) ausgeführt werden.
//
// Es kann dem Router mit der Methode `AddMiddleware` hinzugefügt werden.
//
// Beispiel:
//
// func BeispielMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("vor dem Handler ausgeführt")
// producedMessages, err := h(message)
// fmt.Println("nach dem Handler ausgeführt")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Die vollständige Liste der Standard-Middleware finden Sie in Middlewares.
Plugins
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin ist eine Funktion, die beim Starten des Routers ausgeführt wird.
type RouterPlugin func(*Router) error
// ...
Die vollständige Liste der Standard-Plugins finden Sie in message/router/plugin.
Kontext
Einige nützliche Werte werden im Kontext
für jede vom Handler empfangene Nachricht gespeichert:
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx gibt den Namen des Nachrichtenhandlers im Router zurück, der die Nachricht aus dem Kontext verarbeitet hat.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx gibt den Namen des Nachrichten-Publishertyps im Router aus dem Kontext zurück.
// Zum Beispiel wird es für Kafka `kafka.Publisher` sein.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx gibt den Namen des Nachrichten-Subscriber-Typs im Router aus dem Kontext zurück.
// Zum Beispiel wird es für Kafka `kafka.Subscriber` sein.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx gibt das Thema zurück, von dem die Nachricht im Router aus dem Kontext empfangen wurde.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx gibt das Thema zurück, an das die Nachricht im Router aus dem Kontext veröffentlicht wird.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...