Interfejs niestandardowego wydawcy/subskrybenta

Aby dodać obsługę niestandardowego wydawcy/subskrybenta, musisz zaimplementować interfejsy message.Publisher i message.Subscriber.

Kompletny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publish publikuje dostarczone wiadomości do określonego tematu.
    //
    // Publikacja może być wykonana synchronicznie lub asynchronicznie, w zależności od implementacji.
    //
    // Większość implementacji wydawców nie obsługuje atomowej publikacji wiadomości.
    // Oznacza to, że jeśli publikacja jednej wiadomości nie powiedzie się, następna wiadomość nie zostanie opublikowana.
    //
    // Publish musi być bezpieczny wątkowo.
    Publish(topic string, messages ...*Message) error
    // Jeśli wydawca jest asynchroniczny, Close powinien opróżnić niezapisane wiadomości.
    Close() error
}

// Subscriber to część konsumująca wydawcy/subskrybenta.
type Subscriber interface {
    // Subscribe zwraca kanał wyjściowy dla wiadomości otrzymanych z podanego tematu.
    // Kanał zostanie zamknięty po wywołaniu Close() na subskrybencie.
    //
    // Aby otrzymać następną wiadomość, należy wywołać Ack() na otrzymanej wiadomości.
    // Jeśli przetwarzanie wiadomości zawiedzie i wiadomość powinna być ponownie dostarczona, należy wywołać Nack().
    //
    // Gdy podane ctx jest anulowane, subskrybent zamknie subskrypcję i kanał wyjściowy.
    // Podany ctx jest ustawiony na wszystkich wygenerowanych wiadomościach.
    // Gdy wywoływane są Nack lub Ack na wiadomości, kontekst wiadomości jest anulowany.
    Subscribe(ctx context.Context, topic string) (

Lista zadań do wykonania

Oto kilka rzeczy, których nie powinieneś zapomnieć:

  1. Logowanie (dobre komunikaty i odpowiednie poziomy).
  2. Możliwość zastępowania i konfigurowania enkoderów wiadomości.
  3. Implementacja Close() dla wydawców i subskrybentów powinna być:
    • Idempotentna
    • W stanie działać poprawnie, gdy wydawca lub subskrybent zostanie zablokowany (np. oczekiwanie na Ack)
    • W stanie działać poprawnie, gdy kanał wyjściowy subskrybenta zostanie zablokowany (ponieważ nikt go nie nasłuchuje)
  4. Obsługa Ack() i Nack() dla skonsumowanych wiadomości.
  5. Obsługa ponownego dostarczania skonsumowanych wiadomości za pomocą Nack().
  6. Użycie testów generycznych wydawcy/subskrybenta. Należy sięgnąć po przewodnik po rozwiązywaniu problemów testowych w celu uzyskania wskazówek dotyczących debugowania.
  7. Optymalizacja wydajności.
  8. Dokumentacja GoDocs, Markdownu i przykłady wprowadzające.