Importanza della Pubblicazione dei Messaggi in una Transazione

Quando si tratta di applicazioni basate su eventi, ci possono essere momenti in cui è necessario persistere lo stato dell'applicazione e pubblicare messaggi per informare altre parti del sistema su ciò che è appena accaduto. In uno scenario ideale, si vorrebbe persistere lo stato dell'applicazione e pubblicare un messaggio all'interno di una singola transazione, poiché non farlo può portare a problemi di consistenza dei dati. Per confermare contemporaneamente lo storage dei dati e pubblicare eventi all'interno di una transazione, è necessario essere in grado di pubblicare messaggi sullo stesso database utilizzato per lo storage dei dati, o implementare un meccanismo di commit a due fasi (2PC). Se non si desidera modificare il message broker nel database e non si vuole reinventare la ruota, è possibile semplificare il lavoro utilizzando il componente Forwarder di Watermill!

Componente Forwarder

Si può pensare al Forwarder come a un demone in background che attende i messaggi da pubblicare sul database e si assicura che raggiungano alla fine il message broker.

Per rendere il Forwarder generico e trasparente da utilizzare, ascolta su un topic dedicato sul database intermedio e utilizza un Publisher del Forwarder decorato per inviare i messaggi incapsulati. Il Forwarder li estrae e li invia al topic di destinazione specificato sul message broker.

Esempio

Consideriamo il seguente esempio: c'è un comando responsabile per condurre una estrazione a sorte. Deve selezionare casualmente un utente registrato nel sistema come vincitore. Mentre fa ciò, dovrebbe anche persistere la sua decisione memorizzando un'entry nel database che collega l'ID univoco della lotteria con l'ID dell'utente selezionato. Inoltre, come sistema basato su eventi, dovrebbe anche pubblicare un evento LotteryConcluded, in modo che altre componenti possano reagire in modo appropriato. Per essere precisi, ci sarà una componente responsabile dell'invio dei premi al vincitore della lotteria. Riceverà l'evento LotteryConcluded e verificherà con l'entry nel database utilizzando l'ID della lotteria incorporato per determinare il vincitore.

Nel nostro scenario, il database è MySQL e il message broker è Google Pub/Sub, ma potrebbero essere altre due tecnologie.

Nell'implementare un tale comando, possono essere adottati vari approcci. Di seguito presenteremo tre possibili tentativi e ne indicheremo i difetti.

Pubblica prima l'evento, quindi archivia i dati

In questo approccio, il comando pubblica prima un evento e poi archivia i dati. Anche se questo metodo può funzionare correttamente nella maggior parte dei casi, cerchiamo di individuare potenziali problemi.

Il comando deve eseguire tre operazioni di base:

  1. Selezionare un utente casuale A come vincitore.
  2. Pubblicare un evento LotteryConcluded per informare che la lotteria B è terminata.
  3. Archiviare nel database che la lotteria B è stata vinta dall'utente A.

Ogni passaggio è soggetto a failure, il che può interrompere il flusso del comando. Se il primo passaggio fallisce, le conseguenze non saranno gravi: è sufficiente restituire un errore e considerare l'intero comando come fallito. Nessun dato verrà archiviato e nessun evento verrà pubblicato. Possiamo semplicemente eseguire nuovamente il comando.

Se il secondo passaggio fallisce, non abbiamo ancora pubblicato l'evento o archiviato i dati nel database. Possiamo eseguire nuovamente il comando e riprovare.

Lo scenario più interessante è cosa succede se il terzo passaggio fallisce. Dopo il secondo passaggio, abbiamo già pubblicato l'evento, ma alla fine non verranno archiviati dati nel database. Altri componenti riceveranno il segnale che la lotteria è terminata, ma non ci sarà alcun vincitore associato all'ID della lotteria inviato nell'evento. Non saranno in grado di verificare il vincitore, quindi anche le loro operazioni devono essere considerate fallite.

Possiamo comunque uscire da questa situazione, ma potrebbe richiedere alcune operazioni manuali, come eseguire nuovamente il comando utilizzando l'ID della lotteria dall'evento che è già stato inviato.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Pubblica prima l'evento su Google Cloud Pub/Sub, quindi archivia i dati in MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// Se si verifica un errore, l'evento è stato inviato ma i dati non sono ancora stati salvati.
	if err = simulateError(); err != nil {
		logger.Error("Impossibile archiviare i dati", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

Archivia prima i dati, poi pubblica l'evento

Nel secondo approccio, cercheremo di affrontare i difetti del metodo che prevede di pubblicare l'evento prima. Al fine di evitare di divulgare la situazione di fallimento a componenti esterni quando lo stato non è correttamente persistito nel database e gli eventi non sono stati pubblicati, cambieremo l'ordine delle azioni come segue:

  1. Selezionare casualmente l'utente A come vincitore della lotteria B.
  2. Persistere le informazioni che la lotteria B è stata vinta dall'utente A nel database.
  3. Pubblicare un evento LotteryConcluded, informando che la lotteria B è terminata.

Simile al primo approccio, se le prime due azioni falliscono, non avremo conseguenze. Nel caso del fallimento della terza azione, i nostri dati saranno persistiti nel database, ma nessun evento sarà pubblicato. In questo caso, non divulgheremo la situazione di fallimento a componenti esterni alla lotteria. Tuttavia, considerando il comportamento previsto del sistema, il nostro vincitore non potrà ricevere il premio perché nessun evento viene trasmesso al componente responsabile di questa operazione.

Questo problema potrebbe essere risolto anche mediante un'operazione manuale, ovvero pubblicando manualmente l'evento. Ma possiamo fare di meglio.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. Persistere i dati in MySQL prima di pubblicare direttamente l'evento su Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// Se questo fallisce, i nostri dati sono già persistiti, ma nessun evento viene pubblicato.
	if err = simulateError(); err != nil {
		logger.Error("Impossibile pubblicare l'evento", err, nil)
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

Memorizzazione dei dati e pubblicazione degli eventi in una transazione

Immagina che il nostro comando possa eseguire contemporaneamente il secondo e il terzo compito. Saranno eseguiti atomicamente, il che significa che se uno fallisce, l'altro non può avere successo. Questo può essere realizzato sfruttando il meccanismo di transazione già implementato nella maggior parte dei database. Il MySQL utilizzato nel nostro esempio è uno di essi.

Per memorizzare i dati e pubblicare gli eventi in una singola transazione, è necessario essere in grado di pubblicare messaggi su MySQL. Poiché non vogliamo modificare il message broker per essere supportato da MySQL in tutto il sistema, dobbiamo trovare altri modi per realizzarlo.

La buona notizia è che Watermill fornisce tutti gli strumenti necessari! Se stai utilizzando un database come MySQL, PostgreSQL (o qualsiasi altro database SQL), Firestore o Bolt, puoi pubblicare messaggi su di essi. Il componente Forwarder ti aiuterà a selezionare tutti i messaggi che pubblichi nel database e a inoltrarli al tuo message broker.

Devi assicurarti che:

  1. Il tuo comando utilizzi un publisher che lavori nel contesto di una transazione di database (ad esempio SQL, Firestore, Bolt).
  2. Il componente Forwarder è in esecuzione, utilizzando un subscriber di database e un publisher di message broker.

In questo caso, il comando potrebbe apparire così:

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Per abilitare il funzionamento del componente Forwarder in background e inoltrare i messaggi da MySQL a Google Pub/Sub, è necessario configurarlo come segue:

Per il codice completo, fare riferimento a github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// Persistere i dati su MySQL e pubblicare eventi su Google Cloud Pub/Sub in una transazione.
func persistDataAndPublishEventInTransaction(idLotteria int, utenteSelezionato string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("Annullamento della transazione a causa di un errore", watermill.LogFields{"error": err.Error()})
			// In caso di errore, a causa dell'annullamento della transazione MySQL, possiamo garantire che non avvengano i seguenti scenari indesiderati:
			// - Evento pubblicato ma dati non memorizzati,
			// - Dati memorizzati ma evento non pubblicato.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, idLotteria, utenteSelezionato)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Decorare il publisher con un envelope compreso dal componente forwarder.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// Pubblicare un annuncio dell'evento di conclusione della lotteria. Si noti che qui pubblichiamo su un topic di Google Cloud utilizzando il publisher MySQL decorato.
	evento := LotteryConcludedEvent{LotteryID: idLotteria}
	payload, err := json.Marshal(evento)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
Se desideri saperne di più su questo esempio, trova la sua implementazione [qui](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).