Interfejs niestandardowego wydawcy/subskrybenta
Aby dodać obsługę niestandardowego wydawcy/subskrybenta, musisz zaimplementować interfejsy message.Publisher
i message.Subscriber
.
Kompletny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish publikuje dostarczone wiadomości do określonego tematu.
//
// Publikacja może być wykonana synchronicznie lub asynchronicznie, w zależności od implementacji.
//
// Większość implementacji wydawców nie obsługuje atomowej publikacji wiadomości.
// Oznacza to, że jeśli publikacja jednej wiadomości nie powiedzie się, następna wiadomość nie zostanie opublikowana.
//
// Publish musi być bezpieczny wątkowo.
Publish(topic string, messages ...*Message) error
// Jeśli wydawca jest asynchroniczny, Close powinien opróżnić niezapisane wiadomości.
Close() error
}
// Subscriber to część konsumująca wydawcy/subskrybenta.
type Subscriber interface {
// Subscribe zwraca kanał wyjściowy dla wiadomości otrzymanych z podanego tematu.
// Kanał zostanie zamknięty po wywołaniu Close() na subskrybencie.
//
// Aby otrzymać następną wiadomość, należy wywołać Ack() na otrzymanej wiadomości.
// Jeśli przetwarzanie wiadomości zawiedzie i wiadomość powinna być ponownie dostarczona, należy wywołać Nack().
//
// Gdy podane ctx jest anulowane, subskrybent zamknie subskrypcję i kanał wyjściowy.
// Podany ctx jest ustawiony na wszystkich wygenerowanych wiadomościach.
// Gdy wywoływane są Nack lub Ack na wiadomości, kontekst wiadomości jest anulowany.
Subscribe(ctx context.Context, topic string) (
Lista zadań do wykonania
Oto kilka rzeczy, których nie powinieneś zapomnieć:
- Logowanie (dobre komunikaty i odpowiednie poziomy).
- Możliwość zastępowania i konfigurowania enkoderów wiadomości.
- Implementacja
Close()
dla wydawców i subskrybentów powinna być:- Idempotentna
- W stanie działać poprawnie, gdy wydawca lub subskrybent zostanie zablokowany (np. oczekiwanie na Ack)
- W stanie działać poprawnie, gdy kanał wyjściowy subskrybenta zostanie zablokowany (ponieważ nikt go nie nasłuchuje)
- Obsługa
Ack()
iNack()
dla skonsumowanych wiadomości. - Obsługa ponownego dostarczania skonsumowanych wiadomości za pomocą
Nack()
. - Użycie testów generycznych wydawcy/subskrybenta. Należy sięgnąć po przewodnik po rozwiązywaniu problemów testowych w celu uzyskania wskazówek dotyczących debugowania.
- Optymalizacja wydajności.
- Dokumentacja GoDocs, Markdownu i przykłady wprowadzające.