Wydawca
Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish publikuje dostarczone wiadomości do określonego tematu.
//
// Publikacja może być synchroniczna lub asynchroniczna - zależy to od implementacji.
//
// Większość implementacji wydawcy nie obsługuje atomowej publikacji wiadomości.
// Oznacza to, że jeśli publikacja jednej z wiadomości zawiedzie, następna nie zostanie opublikowana.
//
// Publish musi być bezpieczne dla wątków.
Publish(topic string, messages ...*Message) error
// Jeśli wydawca działa asynchronicznie, Close powinien opróżnić niewysłane wiadomości.
Close() error
}
// ...
Publikowanie wielu wiadomości
Większość implementacji wydawcy nie obsługuje atomowej publikacji wiadomości. Oznacza to, że jeśli publikacja jednej z wiadomości zawiedzie, następna nie zostanie opublikowana.
Publikowanie asynchroniczne
Publikacja może być synchroniczna lub asynchroniczna - zależy to od implementacji.
Close()
Jeśli wydawca działa asynchronicznie, Close
powinien opróżnić niewysłane wiadomości. Nie zapomnij zamknąć subskrybentów. W przeciwnym razie możesz utracić pewne wiadomości.
Subskrybent
Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Subscriber interface {
// Subskrybent zwraca kanał wyjściowy z wiadomościami z podanego tematu.
// Kanał zostanie zamknięty po wywołaniu Close() na subskrybencie.
//
// Aby odebrać następną wiadomość, na otrzymanej wiadomości należy wywołać `Ack()`.
// Jeśli przetwarzanie wiadomości zawiedzie i wiadomość powinna zostać ponownie dostarczona, należy wywołać `Nack()`.
//
// Gdy dostarczony ctx jest anulowany, subskrybent zamknie subskrypcję i kanał wyjściowy.
// Dostarczony ctx jest ustawiony do wszystkich wygenerowanych wiadomości.
// Po wywołaniu Ack lub Nack na wiadomości, kontekst wiadomości zostanie anulowany.
Subscribe(ctx context.Context, topic string) (
}
Mechanizm Ack/Nack
Subskrybenci są odpowiedzialni za obsługę Ack
i Nack
z wiadomości. Odpowiednia implementacja powinna czekać na Ack
lub Nack
przed konsumpcją następnej wiadomości.
Ważny wskazówka dotycząca implementacji subskrybenta: To kluczowe, aby wysłać Ack/offset do magazynu/agenta wiadomości po Ack od wiadomości Watermill. W przeciwnym razie, jeśli proces zakończy się przed przetworzeniem wiadomości, istnieje możliwość utraty wiadomości.
Close()
Close
zamknie wszystkie subskrypcje i ich kanały wyjściowe, i opróżni offsety jeśli konieczne.
Co najmniej jednokrotne dostarczanie
Watermill jest zbudowany z wykorzystaniem przynajmniej jednokrotnych semantyk dostarczania. Oznacza to, że jeśli wystąpi błąd podczas przetwarzania wiadomości i nie jest możliwe wysłanie Ack
, wiadomość zostanie dostarczona ponownie.
Należy pamiętać o tym i budować swoją aplikację do przetwarzania idempotentnego lub zaimplementować mechanizm ponownej próby.
Niestety, utworzenie ogólnego middleware'a ponownej próby nie jest możliwe, dlatego zachęcamy do zbudowania własnej implementacji.
Ogólne testowanie
Każdy Pub/Sub jest podobny pod względem większości aspektów. Aby uniknąć pisania osobnych testów dla każdej implementacji Pub/Sub, stworzyliśmy zestaw testów, który każdy Pub/Sub powinien zdać.
Te testy można znaleźć w pubsub/tests/test_pubsub.go
.
Wbudowana implementacja
Aby sprawdzić dostępne implementacje Pub/Sub, prosimy odnieść się do obsługiwanego Pub/Sub.
Implementacja niestandardowego Pub/Sub
Aby uzyskać instrukcje dotyczące wprowadzenia obsługi nowego Pub/Sub, prosimy odnieść się do "Implementowanie niestandardowego Pub/Sub".