Znaczenie Publikowania Wiadomości w Transakcji

Podczas pracy z aplikacjami opartymi na zdarzeniach, może się zdarzyć, że konieczne będzie zachowanie stanu aplikacji i opublikowanie wiadomości informujących inne części systemu o właśnie zaistniałym zdarzeniu. W idealnym scenariuszu chcielibyśmy zachować stan aplikacji i opublikować wiadomość w ramach pojedynczej transakcji, gdyż niezrobienie tego może prowadzić do problemów z spójnością danych. Aby jednocześnie zatwierdzać przechowywanie danych i publikować zdarzenia w ramach jednej transakcji, musisz móc publikować wiadomości do tego samego systemu bazodanowego, w którym przechowujesz dane, lub zaimplementować mechanizm dwufazowego zatwierdzania (2PC). Jeśli nie chcesz zmieniać brokera wiadomości w bazie danych i nie chcesz wynajdywać koła na nowo, możesz uprościć swoją pracę, korzystając z komponentu Forwarder w Watermill!

Komponent Forwarder

Możesz rozważać Forwarder jako tło działające w tle, które oczekuje na publikowanie wiadomości do bazy danych i zapewnia, że w końcu trafią one do brokera wiadomości.

Aby uczynić Forwarder ogólnym i przejrzystym w użyciu, nasłuchuje on dedykowanego tematu w bazie danych pośredniej i używa udekorowanego wydawcy Forwarder do wysyłania skapsułkowanych wiadomości. Forwarder rozpakowuje je i wysyła do określonego tematu docelowego w brokerze wiadomości.

Przykład

Rozważmy następujący przykład: mamy polecenie odpowiedzialne za przeprowadzenie losowania loterii. Musi ono losowo wybrać zarejestrowanego użytkownika w systemie jako zwycięzcę. W trakcie tego procesu powinno również zachować swoją decyzję, przechowując wpis w bazie danych łączący unikalne ID loterii z wybranym ID użytkownika. Dodatkowo, jako system oparty na zdarzeniach, powinno również opublikować zdarzenie LoteriaZakończona, aby inne komponenty mogły właściwie zareagować. Aby być precyzyjnym - będzie tam komponent odpowiedzialny za wysyłanie nagród zwycięzcy loterii. Otrzyma ono zdarzenie LoteriaZakończona i zweryfikuje z wpisem w bazie danych przy użyciu osadzonego ID loterii, aby określić zwycięzcę.

W naszym scenariuszu baza danych to MySQL, a broker wiadomości to Google Pub/Sub, ale mogłyby to być dowolne inne dwie technologie.

Podczas implementacji takiego polecenia można podjąć różne podejścia. W dalszej części przedstawimy trzy możliwe próby i zwrócimy uwagę na ich wady.

Najpierw opublikuj wydarzenie, a następnie przechowaj dane

W tym podejściu polecenie najpierw publikuje wydarzenie, a następnie przechowuje dane. Chociaż ta metoda może działać poprawnie w większości przypadków, spróbujmy zidentyfikować potencjalne problemy.

Polecenie musi wykonać trzy podstawowe operacje:

  1. Wybierz losowego użytkownika A jako zwycięzcę.
  2. Opublikuj wydarzenie LotteryConcluded, aby poinformować, że loteria B się zakończyła.
  3. Przechowaj w bazie danych informację, że loteria B została wygrana przez użytkownika A.

Każdy krok jest podatny na awarie, które mogą zakłócić nasz przepływ polecenia. Jeśli pierwszy krok się nie powiedzie, konsekwencje nie będą poważne - wystarczy zwrócić błąd i uznać całe polecenie za nieudane. Żadne dane nie zostaną przechowane, a żadne wydarzenie nie zostanie opublikowane. Możemy po prostu ponownie uruchomić polecenie.

Jeśli drugi krok się nie powiedzie, nadal nie opublikowaliśmy wydarzenia ani nie przechowaliśmy danych w bazie danych. Możemy ponownie uruchomić polecenie i spróbować ponownie.

Najbardziej interesujący scenariusz to sytuacja, gdy trzeci krok zawiedzie. Po drugim kroku już opublikowaliśmy wydarzenie, ale ostatecznie dane nie zostaną przechowane w bazie danych. Inne komponenty otrzymają sygnał, że loteria się zakończyła, ale nie będzie zwycięzcy powiązanego z identyfikatorem loterii wysłanym w wydarzeniu. Nie będą w stanie zweryfikować zwycięzcy, dlatego ich działania również muszą być uznane za nieudane.

Mimo to możemy wyjść z tej sytuacji, ale może to wymagać pewnych operacji manualnych, takich jak ponowne uruchomienie polecenia przy użyciu identyfikatora loterii z wydarzenia, które zostało już wysłane.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Najpierw opublikuj wydarzenie w usłudze Google Cloud Pub/Sub, a następnie przechowaj dane w MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// Jeśli wystąpi błąd, wydarzenie zostało wysłane, ale dane nie zostały jeszcze zapisane.
	if err = simulateError(); err != nil {
		logger.Error("Nie udało się zapisać danych", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

Najpierw przechowaj dane, a następnie opublikuj zdarzenie

W drugim podejściu postaramy się zająć się wadami metody zapisu pierwszego. Aby zapobiec ujawnianiu sytuacji awaryjnych związanych z niepoprawnym zapisem stanu w bazie danych oraz brakiem publikacji zdarzeń, zmienimy kolejność działań następująco:

  1. Losowo wybierz użytkownika A jako zwycięzcę loterii B.
  2. Zapisz informację o wygranej loterii B przez użytkownika A w bazie danych.
  3. Opublikuj zdarzenie LotteryConcluded, informujące o zakończeniu loterii B.

Podobnie jak w pierwszym podejściu, w przypadku niepowodzenia dwóch pierwszych działań nie poniesiemy żadnych konsekwencji. W przypadku niepowodzenia trzeciej akcji nasze dane zostaną zapisane w bazie danych, ale żadne zdarzenie nie zostanie opublikowane. W tym przypadku nie ujawnimy sytuacji awaryjnej komponentom poza loterią. Niemniej jednak, mając na uwadze oczekiwane zachowanie systemu, nasz zwycięzca nie będzie w stanie odebrać nagrody, ponieważ żadne zdarzenie nie zostanie przekazane odpowiedniemu komponentowi do obsługi tej operacji.

Ten problem mógłby zostać również rozwiązany poprzez ręczne działanie, czyli ręczną publikację zdarzenia. Ale możemy zrobić lepiej.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. Najpierw przechowaj dane w MySQL, a następnie bezpośrednio opublikuj zdarzenie w Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// Jeśli to się nie powiedzie, nasze dane są już przechowywane, ale żadne zdarzenie nie jest publikowane.
	if err = simulateError(); err != nil {
		logger.Error("Nie udało się opublikować zdarzenia", err, nil)
		return err
	}

err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

Przechowywanie danych i Publikowanie zdarzeń w transakcji

Wyobraź sobie, że nasza komenda może wykonywać drugie i trzecie zadanie jednocześnie. Będą one zatwierdzane atomowo, co oznacza, że jeśli jedno zadanie nie powiedzie się, drugie nie może się powieść. Można to osiągnąć, wykorzystując mechanizm transakcji już zaimplementowany w większości baz danych. MySQL używany w naszym przykładzie jest jedną z nich.

Aby przechowywać dane i publikować zdarzenia w pojedynczej transakcji, musimy móc publikować wiadomości do MySQL. Ponieważ nie chcemy zmieniać brokera wiadomości na taki, który jest obsługiwany przez MySQL w całym systemie, musimy znaleźć inne sposoby osiągnięcia tego celu.

Dobra wiadomość to: Watermill dostarcza wszystkie niezbędne narzędzia! Jeśli korzystasz z bazy danych takiej jak MySQL, PostgreSQL (lub jakiejkolwiek innej bazy danych SQL), Firestore lub Bolt, możesz publikować do nich wiadomości. Składnik Forwarder pomoże Ci wybrać wszystkie wiadomości, które publikujesz do bazy danych i przekazywać je do brokera wiadomości.

Musisz zapewnić, że:

  1. Twoja komenda korzysta z wydawcy pracującego w kontekście transakcji bazy danych (np. SQL, Firestore, Bolt).
  2. Składnik Forwarder działa, korzystając z subskrybenta bazy danych i wydawcy brokera wiadomości.

W tym przypadku komenda może wyglądać tak:

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Aby umożliwić składnikowi Forwarder pracę w tle i przekazywanie wiadomości z MySQL do Google Pub/Sub, musisz skonfigurować go w następujący sposób:

Dla pełnego kodu, proszę odnieść się do github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// Zapisz dane do MySQL i opublikuj zdarzenia w Google Cloud Pub/Sub w transakcji.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("Cofanie transakcji z powodu błędu", watermill.LogFields{"error": err.Error()})
			// W przypadku błędu, w wyniku wycofania transakcji MySQL, możemy zapewnić, że następujące niepożądane scenariusze nie wystąpią:
			// - Zdarzenie opublikowane, ale dane niezapisane,
			// - Dane zapisane, ale zdarzenie nieopublikowane.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Udekoruj wydawcę kopertą zrozumianą przez składnik forwarder.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// Opublikuj ogłoszenie o zakończeniu loterii. Zauważ, że publikujemy tutaj do tematu Google Cloud, używając udekorowanego wydawcy MySQL.
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
Jeśli chcesz dowiedzieć się więcej o tym przykładzie, proszę znaleźć jego implementację [tutaj](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).