Interfaccia Publisher/Subscriber personalizzata
Per aggiungere il supporto per il publisher/subscriber personalizzato, è necessario implementare le interfacce message.Publisher
e message.Subscriber
.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish pubblica i messaggi forniti nel topic specificato.
//
// La pubblicazione può avvenire in modo sincrono o asincrono, a seconda dell'implementazione.
//
// La maggior parte delle implementazioni dei publisher non supporta la pubblicazione atomica dei messaggi.
// Ciò significa che se un messaggio non riesce a pubblicarsi, il messaggio successivo non verrà pubblicato.
//
// Publish deve essere thread-safe.
Publish(topic string, messages ...*Message) error
// Se il publisher è asincrono, Close dovrebbe svuotare i messaggi non inviati.
Close() error
}
// Subscriber è la parte consumatrice del publisher/subscriber.
type Subscriber interface {
// Subscribe restituisce un canale di output per i messaggi ricevuti dal topic fornito.
// Il canale verrà chiuso quando viene chiamato Close() sul subscriber.
//
// Per ricevere il prossimo messaggio, Ack() deve essere chiamato sul messaggio ricevuto.
// Se il processo del messaggio fallisce e il messaggio deve essere recapitato nuovamente, dovrebbe essere chiamato Nack().
//
// Quando il contesto fornito viene annullato, il subscriber chiuderà la sottoscrizione e il canale di output.
// Il contesto fornito viene impostato su tutti i messaggi generati.
// Quando viene chiamato Nack o Ack sul messaggio, il contesto del messaggio viene annullato.
Subscribe(ctx context.Context, topic string) (
Lista delle attività da completare
Ecco alcuni punti che non dovresti dimenticare:
- Logging (messaggi chiari e livelli appropriati).
- Codificatori di messaggi sostituibili e configurabili.
- L'implementazione di
Close()
per publisher e subscriber dovrebbe essere:- Idempotente
- In grado di funzionare correttamente quando il publisher o il subscriber è bloccato (ad esempio, in attesa di Ack)
- In grado di funzionare correttamente quando il canale di output del subscriber è bloccato (perché non c'è nessuno che lo ascolta)
- Supporto per
Ack()
eNack()
per i messaggi consumati. - Supporto per il rinvio dei messaggi consumati utilizzando
Nack()
. - Utilizzare test generici per publisher/subscriber. Dovresti fare riferimento alla guida di risoluzione dei problemi dei test per consigli di debug.
- Ottimizzazione delle prestazioni.
- GoDocs, documentazione Markdown ed esempi introduttivi.