Benutzerdefinierte Publisher/Subscriber-Schnittstelle

Um die Unterstützung für benutzerdefinierte Publisher/Subscriber hinzuzufügen, müssen Sie die Schnittstellen message.Publisher und message.Subscriber implementieren.

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publish veröffentlicht die bereitgestellten Nachrichten zum angegebenen Thema.
    //
    // Das Veröffentlichen kann synchron oder asynchron ausgeführt werden, abhängig von der Implementierung.
    //
    // Die meisten Publisher-Implementierungen unterstützen keine atomare Nachrichtenveröffentlichung.
    // Dies bedeutet, dass wenn eine Nachricht nicht veröffentlicht werden kann, die nächste Nachricht auch nicht veröffentlicht wird.
    //
    // Publish muss thread-sicher sein.
    Publish(topic string, messages ...*Message) error
    // Wenn der Publisher asynchron ist, sollte Close nichtgesendete Nachrichten abwickeln.
    Close() error
}

// Subscriber ist der verbrauchende Teil des Publisher/Subscriber.
type Subscriber interface {
    // Subscribe gibt einen Ausgabekanal für Nachrichten zurück, die vom bereitgestellten Thema empfangen wurden.
    // Der Kanal wird geschlossen, wenn Close() für den Subscriber aufgerufen wird.
    //
    // Um die nächste Nachricht zu empfangen, muss Ack() auf der empfangenen Nachricht aufgerufen werden.
    // Wenn die Verarbeitung der Nachricht fehlschlägt und die Nachricht erneut zugestellt werden soll, sollte Nack() aufgerufen werden.
    //
    // Wenn der bereitgestellte ctx abgebrochen wird, wird der Abonnement von dem Subscriber geschlossen und der Ausgabekanal.
    // Der bereitgestellte ctx wird auf alle generierten Nachrichten gesetzt.
    // Wenn Nack oder Ack auf der Nachricht aufgerufen wird, wird der Kontext der Nachricht abgebrochen.
    Subscribe(ctx context.Context, topic string) (

To-Do-Liste

Hier sind ein paar Punkte, die Sie nicht vergessen sollten:

  1. Protokollierung (gute Nachrichten und angemessene Ebenen).
  2. Austauschbare und konfigurierbare Nachrichtencodierer.
  3. Die Implementierung von Close() für Publisher und Subscriber sollte sein:
    • Idempotent
    • In der Lage ordnungsgemäß zu arbeiten, wenn der Publisher oder Subscriber blockiert ist (z. B. auf Ack warten)
    • In der Lage ordnungsgemäß zu arbeiten, wenn der Ausgabekanal des Subscribers blockiert ist (weil niemand zuhört)
  4. Unterstützung für Ack() und Nack() für konsumierte Nachrichten.
  5. Unterstützung für die erneute Zustellung von konsumierten Nachrichten mithilfe von Nack().
  6. Verwendung von generischen Publisher/Subscriber-Tests. Sie sollten sich für Debugging-Tipps auf den Test-Troubleshooting-Leitfaden beziehen.
  7. Leistungsoptimierung.
  8. GoDocs, Markdown-Dokumentation und Einführungsbeispiele.