Publisher e Subscriber sono le parti di livello inferiore di Watermill. Nelle applicazioni pratiche, di solito si desidera utilizzare interfacce e funzioni di alto livello, come associazioni, metriche, code di messaggi inutilizzabili, ritentativi, limiti di velocità, ecc.
A volte, potresti non voler inviare un Ack quando l'elaborazione ha avuto successo. Alcune volte, potresti voler inviare un messaggio dopo che un altro messaggio è stato elaborato.
Per soddisfare queste esigenze, esiste un componente chiamato Router.
Configurazione
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout determina per quanto tempo il router dovrebbe lavorare per i gestori durante la chiusura.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate controlla se ci sono errori nella configurazione del router.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Gestore
Prima di tutto, è necessario implementare la funzione HandlerFunc
:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc è la funzione chiamata quando un messaggio viene ricevuto.
//
// Quando HandlerFunc non restituisce un errore, msg.Ack() verrà chiamato automaticamente.
//
// Quando HandlerFunc restituisce un errore, verrà chiamato msg.Nack().
//
// Quando msg.Ack() viene chiamato nell'handler e HandlerFunc restituisce un errore,
// msg.Nack() non verrà inviato perché Ack è già stato inviato.
//
// Quando si ricevono più messaggi (a causa dell'invio di msg.Ack() in HandlerFunc o del supporto a più consumatori del Subscriber),
// HandlerFunc verrà eseguito in modo concorrente.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Successivamente, è necessario utilizzare Router.AddHandler
per aggiungere un nuovo gestore:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler aggiunge un nuovo gestore.
// Il nome del gestore deve essere univoco. Attualmente, viene utilizzato solo per il debug.
// subscribeTopic è l'argomento da cui il gestore riceverà i messaggi.
// publishTopic è l'argomento da cui i messaggi restituiti dal gestore verranno generati dal Router.
// Quando il gestore deve pubblicare su più argomenti,
// si consiglia di iniettare solo il Publisher al gestore o implementare un middleware,
// che può catturare i messaggi in base ai metadati e pubblicarli su argomenti specifici.
// Se un gestore viene aggiunto mentre il router è già in esecuzione, è necessario chiamare esplicitamente RunHandlers().
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Aggiunta gestore", watermill.LogFields{
"nome_gestore": handlerName,
"argomento": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler aggiunge un nuovo gestore.
// Questo gestore non può restituire messaggi.
// Quando restituisce un messaggio, si verifica un errore e viene inviato un Nack.
//
// Il nome del gestore deve essere univoco. Attualmente, viene utilizzato solo per il debug.
// subscribeTopic è l'argomento da cui il gestore riceverà i messaggi.
// subscriber è un subscriber utilizzato per consumare i messaggi.
// Se un gestore viene aggiunto mentre il router è già in esecuzione, è necessario chiamare esplicitamente RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
Fare riferimento all'esempio di utilizzo in "Getting Started". Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler restituisce un gestore che può essere utilizzato per aggiungere un middleware a livello di gestore o fermare i gestori.
gestore := router.AddHandler(
"struct_handler", // nome del gestore, deve essere univoco
"incoming_messages_topic", // topic da cui vengono letti gli eventi
pubSub,
"outgoing_messages_topic", // topic in cui pubblicare gli eventi
pubSub,
structHandler{}.Handler,
)
// Il middleware a livello di gestore viene eseguito solo per gestori specifici
// Questo middleware può essere aggiunto allo stesso modo del middleware a livello di router
gestore.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Esecuzione del middleware specifico del gestore, UUID del messaggio: ", message.UUID)
return h(message)
}
})
// ...
Nessun gestore di pubblicazioni
Non tutti i gestori genereranno un nuovo messaggio. È possibile utilizzare Router.AddNoPublisherHandler
per aggiungere questo tipo di gestore:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler aggiunge un nuovo gestore.
// Questo gestore non può restituire messaggi.
// Quando restituisce un messaggio, si verificherà un errore e verrà inviato un Nack.
//
// handlerName deve essere univoco e attualmente utilizzato solo per scopi di debug.
//
// subscribeTopic è l'argomento su cui il gestore riceverà i messaggi.
//
// Il subscriber è utilizzato per consumare i messaggi.
//
// Se si aggiunge un gestore a un router che è già in esecuzione, è necessario chiamare esplicitamente RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Riconoscimento
Per impostazione predefinita, quando HanderFunc
non restituisce un errore, verrà chiamato msg.Ack()
. Se viene restituito un errore, verrà chiamato msg.Nack()
. Quindi, dopo aver gestito il messaggio, non è necessario chiamare msg.Ack()
o msg.Nack
(ovviamente, è possibile farlo se si desidera).
Produzione dei messaggi
Quando vengono restituiti più messaggi dal gestore, si noti che la maggior parte delle implementazioni di Publisher non supporta la pubblicazione atomica dei messaggi. Se il broker o lo storage non sono disponibili, potrebbero essere generati solo alcuni messaggi e verrà inviato msg.Nack()
.
Se questo è un problema, si consideri di fare in modo che ciascun gestore pubblichi un solo messaggio.
Esecuzione del Router
Per eseguire il router, è necessario chiamare Run()
.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run esegue tutti i plugin e i gestori e inizia a sottoscriversi agli argomenti dati.
// Questa chiamata viene bloccata durante l'esecuzione del router.
//
// Quando tutti i gestori si fermano (ad esempio perché la sottoscrizione è stata chiusa), il router si fermerà anche.
//
// Per interrompere Run(), è necessario chiamare Close() sul router.
//
// ctx verrà propagato a tutti i sottoscrittori.
//
// Quando tutti i gestori si fermano (ad es.: a causa di connessioni chiuse), anche Run() si fermerà.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Assicurarsi che il Router sia in esecuzione
Potrebbe essere utile capire se il router è in esecuzione. È possibile farlo utilizzando il metodo Running()
.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running si chiude quando il router è in esecuzione.
// In altre parole, è possibile attendere che il router sia in esecuzione in questo modo:
// fmt.Println("Avvio del router")
// go r.Run(ctx)
// // fmt.Println("Il router è in esecuzione")
// Avvertenza: per motivi storici, questo canale non sa della chiusura del router: si chiuderà se il router continua a funzionare e poi si spegne.
func (r *Router) Running() chan struct{} {
// ...
}
È inoltre possibile utilizzare la funzione IsRunning
che restituisce un valore booleano:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning restituisce true quando il router è in esecuzione.
//
// Avviso: per motivi storici, questo metodo non sa dello stato chiuso del router.
// Se si vuole sapere se il router è stato chiuso, utilizzare IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
Spegnimento del router
Per spegnere il router, è necessario chiamare Close()
.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close chiude in modo corretto il router con un timeout fornito nella configurazione.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
spegnerà tutti i publisher e i subscriber, e attenderà il completamento di tutti gli handler.
Close()
attenderà il timeout impostato in RouterConfig.CloseTimeout
nella configurazione. Se il timeout viene raggiunto, Close()
restituirà un errore.
Aggiunta di handler dopo l'avvio del router
È possibile aggiungere un nuovo handler quando il router è già in esecuzione. Per farlo, è necessario chiamare AddNoPublisherHandler
o AddHandler
, e poi chiamare RunHandlers
.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers esegue tutti gli handler che sono stati aggiunti dopo Run().
// RunHandlers è idempotente, quindi può essere chiamato più volte in modo sicuro.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Arresto degli handler in esecuzione
È possibile arrestare solo un handler in esecuzione chiamando Stop()
.
Si noti che il router si spegnerà quando non ci sono handler in esecuzione.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop arresta l'handler.
// Stop è asincrono.
// È possibile controllare se l'handler è stato arrestato con la funzione Stopped().
func (h *Handler) Stop() {
// ...
Modello di esecuzione
Subscribers possono consumare un singolo messaggio in modo sequenziale o più messaggi in parallelo.
- Il Flusso singolo del messaggio è il metodo più semplice, il che significa che i subscribers non riceveranno nuovi messaggi finché non viene chiamato
msg.Ack()
. - Il Flusso multiplo del messaggio è supportato solo da alcuni subscribers. Iscrivendosi contemporaneamente a più partizioni di topic, è possibile consumare più messaggi in parallelo, anche messaggi che non sono stati precedentemente confermati (ad es. come funzionano i subscriber Kafka). Il router elabora questo modello eseguendo
HandlerFunc
in parallelo.
Si prega di fare riferimento alla documentazione Pub/Sub selezionata per comprendere i modelli di esecuzione supportati.
Middleware
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware ci consente di scrivere qualcosa di simile a un decoratore per HandlerFunc.
// Può eseguire delle operazioni prima (ad esempio, modificare il messaggio consumato) o dopo l'handler (modificare il messaggio generato, confermare/rifiutare il messaggio consumato, gestire gli errori, registrare, ecc.).
//
// Può essere allegato al router utilizzando il metodo `AddMiddleware`.
//
// Esempio:
//
// func ExampleMiddleware(h HandlerFunc) HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("eseguito prima dell'handler")
// messaggiGenerati, err := h(message)
// fmt.Println("eseguito dopo l'handler")
//
// return messaggiGenerati, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
L'elenco completo dei middleware standard può essere trovato in Middlewares.
Plugin
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin è una funzione eseguita quando il router viene avviato.
type RouterPlugin func(*Router) error
// ...
L'elenco completo dei plugin standard può essere trovato in message/router/plugin.
Contesto
Alcuni valori utili sono memorizzati nel context
per ogni messaggio ricevuto dall'handler:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx restituisce il nome dell'handler del messaggio nel router che ha consumato il messaggio dal contesto.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx restituisce il nome del tipo di publisher del messaggio nel router dal contesto.
// Ad esempio, per Kafka, sarà `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx restituisce il nome del tipo di subscriber del messaggio nel router dal contesto.
// Ad esempio, per Kafka, sarà `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx restituisce il topic da cui è stato ricevuto il messaggio nel router dal contesto.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx restituisce il topic a cui il messaggio sarà pubblicato nel router dal contesto.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...