Interfaccia Publisher/Subscriber personalizzata

Per aggiungere il supporto per il publisher/subscriber personalizzato, è necessario implementare le interfacce message.Publisher e message.Subscriber.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publish pubblica i messaggi forniti nel topic specificato.
    //
    // La pubblicazione può avvenire in modo sincrono o asincrono, a seconda dell'implementazione.
    //
    // La maggior parte delle implementazioni dei publisher non supporta la pubblicazione atomica dei messaggi.
    // Ciò significa che se un messaggio non riesce a pubblicarsi, il messaggio successivo non verrà pubblicato.
    //
    // Publish deve essere thread-safe.
    Publish(topic string, messages ...*Message) error
    // Se il publisher è asincrono, Close dovrebbe svuotare i messaggi non inviati.
    Close() error
}

// Subscriber è la parte consumatrice del publisher/subscriber.
type Subscriber interface {
    // Subscribe restituisce un canale di output per i messaggi ricevuti dal topic fornito.
    // Il canale verrà chiuso quando viene chiamato Close() sul subscriber.
    //
    // Per ricevere il prossimo messaggio, Ack() deve essere chiamato sul messaggio ricevuto.
    // Se il processo del messaggio fallisce e il messaggio deve essere recapitato nuovamente, dovrebbe essere chiamato Nack().
    //
    // Quando il contesto fornito viene annullato, il subscriber chiuderà la sottoscrizione e il canale di output.
    // Il contesto fornito viene impostato su tutti i messaggi generati.
    // Quando viene chiamato Nack o Ack sul messaggio, il contesto del messaggio viene annullato.
    Subscribe(ctx context.Context, topic string) (

Lista delle attività da completare

Ecco alcuni punti che non dovresti dimenticare:

  1. Logging (messaggi chiari e livelli appropriati).
  2. Codificatori di messaggi sostituibili e configurabili.
  3. L'implementazione di Close() per publisher e subscriber dovrebbe essere:
    • Idempotente
    • In grado di funzionare correttamente quando il publisher o il subscriber è bloccato (ad esempio, in attesa di Ack)
    • In grado di funzionare correttamente quando il canale di output del subscriber è bloccato (perché non c'è nessuno che lo ascolta)
  4. Supporto per Ack() e Nack() per i messaggi consumati.
  5. Supporto per il rinvio dei messaggi consumati utilizzando Nack().
  6. Utilizzare test generici per publisher/subscriber. Dovresti fare riferimento alla guida di risoluzione dei problemi dei test per consigli di debug.
  7. Ottimizzazione delle prestazioni.
  8. GoDocs, documentazione Markdown ed esempi introduttivi.