Tầm quan trọng của việc xuất bản tin nhắn trong giao dịch

Khi xử lý các ứng dụng dựa trên sự kiện, có những lúc bạn cần lưu trữ trạng thái ứng dụng và xuất bản tin nhắn để thông báo với các phần khác của hệ thống về những gì vừa xảy ra. Trong kịch bản lý tưởng, bạn muốn lưu trạng thái ứng dụng và xuất bản tin nhắn trong một giao dịch duy nhất, vì việc làm ngược lại có thể dẫn đến vấn đề về tính nhất quán dữ liệu. Để cùng một lúc lưu trữ dữ liệu và xuất bản sự kiện trong một giao dịch, bạn phải có khả năng xuất bản tin nhắn đến cùng cơ sở dữ liệu được sử dụng để lưu trữ dữ liệu, hoặc triển khai một cơ chế cam kết hai giai đoạn (2PC). Nếu bạn không muốn thay đổi trình quản lý tin nhắn sang cơ sở dữ liệu và cũng không muốn phải tái phát minh bánh xe, bạn có thể đơn giản hóa công việc của mình bằng cách sử dụng Forwarder component của Watermill!

Forwarder Component

Bạn có thể coi Forwarder như một tiến trình nền đợi tin nhắn được xuất bản đến cơ sở dữ liệu và đảm bảo rằng chúng cuối cùng sẽ đến được với người quản lý tin nhắn.

Để làm cho Forwarder trở nên chung chung và trong suốt để sử dụng, nó lắng nghe trên một chủ đề được dành riêng trên cơ sở dữ liệu trung gian và sử dụng một đơn vị xuất bản Forwarder đã được trang trí để gửi các tin nhắn bọc. Forwarder giải nén chúng và gửi chúng đến chủ đề mục tiêu đã được chỉ định trên người quản lý tin nhắn.

Ví dụ

Hãy xem xét ví dụ sau: có một lệnh chịu trách nhiệm thực hiện việc rút thăm xổ số. Nó phải chọn một người dùng đã đăng ký trong hệ thống một cách ngẫu nhiên làm người chiến thắng. Trong khi làm điều này, nó cũng nên lưu trữ quyết định của mình bằng cách lưu trữ một bản ghi cơ sở dữ liệu liên kết ID xổ số duy nhất với ID người dùng đã chọn. Ngoài ra, như một hệ thống dựa trên sự kiện, nó cũng nên xuất bản một sự kiện Kết thúc Xổ số, để các thành phần khác có thể phản ứng một cách phù hợp. Cụ thể hơn - sẽ có một thành phần chịu trách nhiệm gửi quà cho người chiến thắng xổ số. Nó sẽ nhận sự kiện Kết thúc Xổ số và xác minh với bản ghi cơ sở dữ liệu sử dụng ID xổ số nhúng để xác định người chiến thắng.

Trong tình huống của chúng tôi, cơ sở dữ liệu là MySQL và người quản lý tin nhắn là Google Pub/Sub, nhưng nó cũng có thể là bất kỳ hai công nghệ khác nào.

Khi triển khai một lệnh như vậy, có thể áp dụng các phương pháp khác nhau. Tiếp theo, chúng tôi sẽ giới thiệu ba cố gắng có thể thực hiện và chỉ ra điểm yếu của chúng.

Đăng bài viết Trước, Sau Đó Lưu Dữ Liệu

Trong phương pháp này, lệnh sẽ đăng bài viết trước và sau đó lưu dữ liệu. Mặc dù phương pháp này có thể hoạt động đúng đắn trong hầu hết các trường hợp, chúng ta cùng tìm hiểu vấn đề có thể xảy ra.

Lệnh cần thực hiện ba hoạt động cơ bản:

  1. Chọn người dùng ngẫu nhiên A là người chiến thắng.
  2. Đăng bài viết LotteryConcluded để thông báo rằng kỳ quay số B đã kết thúc.
  3. Lưu trong cơ sở dữ liệu rằng kỳ quay số B đã được người dùng A chiến thắng.

Mỗi bước đều có nguy cơ thất bại, có thể làm gián đoạn luồng của lệnh chúng ta. Nếu bước đầu tiên thất bại, hậu quả không nghiêm trọng - chúng ta chỉ cần trả về một lỗi và xem xét rằng toàn bộ lệnh đã thất bại. Không có dữ liệu nào sẽ được lưu trữ và không có bài viết nào sẽ được đăng bài. Chúng ta có thể đơn giản là chạy lại lệnh từ đầu.

Nếu bước thứ hai thất bại, chúng ta vẫn chưa đăng bài viết hoặc lưu dữ liệu trong cơ sở dữ liệu. Chúng ta có thể chạy lại lệnh và thử lại.

Kịch bản thú vị nhất là khi bước thứ ba thất bại. Sau bước thứ hai, chúng ta đã đăng bài viết, nhưng cuối cùng không có dữ liệu nào được lưu trong cơ sở dữ liệu. Các thành phần khác sẽ nhận được tín hiệu kỳ quay số đã kết thúc, nhưng sẽ không có người chiến thắng nào được liên kết với ID kỳ quay số được gửi trong sự kiện. Họ sẽ không thể xác minh người chiến thắng, vì vậy họ cũng phải xem xét hoạt động của họ như đã thất bại.

Chúng ta vẫn có thể thoát khỏi tình huống này, nhưng có thể yêu cầu một số hoạt động thủ công, ví dụ như chạy lại lệnh sử dụng ID kỳ quay số từ sự kiện đã được gửi.

Mã nguồn hoàn chỉnh: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Đăng bài viết lên Google Cloud Pub/Sub trước, sau đó lưu dữ liệu vào MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// Nếu có lỗi, sự kiện đã được gửi nhưng dữ liệu vẫn chưa được lưu.
	if err = simulateError(); err != nil {
		logger.Error("Lưu dữ liệu thất bại", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

Lưu trữ dữ liệu trước, sau đó công bố sự kiện

Trong phương pháp thứ hai, chúng ta sẽ cố gắng giải quyết nhược điểm của phương pháp lưu trữ trước. Để ngăn chặn tình huống thất bại được tiết lộ cho các thành phần bên ngoài khi trạng thái không được lưu đúng cách trong cơ sở dữ liệu và các sự kiện không được công bố, chúng tôi sẽ thay đổi thứ tự các hành động như sau:

  1. Ngẫu nhiên chọn người dùng A là người chiến thắng của xổ số B.
  2. Lưu thông tin rằng xổ số B đã được người dùng A chiến thắng vào cơ sở dữ liệu.
  3. Công bố sự kiện LotteryConcluded, thông báo rằng xổ số B đã kết thúc.

Tương tự như phương pháp đầu tiên, nếu hai hành động đầu thất bại, chúng ta không có hậu quả. Trong trường hợp hành động thứ ba thất bại, dữ liệu của chúng ta sẽ được lưu trong cơ sở dữ liệu, nhưng không có sự kiện nào được công bố. Trong trường hợp này, chúng ta sẽ không tiết lộ tình huống thất bại cho các thành phần bên ngoài của xổ số. Tuy nhiên, khi xem xét hành vi của hệ thống, người chiến thắng của chúng ta sẽ không thể nhận được giải thưởng vì không có sự kiện nào được chuyển đến thành phần chịu trách nhiệm cho hoạt động này.

Vấn đề này cũng có thể được giải quyết bằng cách thủ công, tức là, công bố sự kiện bằng tay. Nhưng chúng ta có thể làm tốt hơn.

Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. Lưu trữ dữ liệu vào MySQL trước, sau đó trực tiếp công bố sự kiện đến Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// Nếu thất bại, dữ liệu của chúng ta đã được lưu, nhưng không có sự kiện nào được công bố.
	if err = simulateError(); err != nil {
		logger.Error("Công bố sự kiện thất bại", err, nil)
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

Lưu trữ Dữ liệu và Xuất bản Sự kiện trong Giao dịch

Hãy tưởng tượng rằng lệnh của chúng ta có thể thực hiện các công việc thứ hai và thứ ba cùng một lúc. Chúng sẽ được cam kết một cách nguyên tử, có nghĩa là nếu một công việc thất bại, công việc kia không thể thành công. Điều này có thể được đạt được bằng cách tận dụng cơ chế giao dịch đã triển khai trong hầu hết các cơ sở dữ liệu. MySQL được sử dụng trong ví dụ của chúng ta là một trong số đó.

Để lưu trữ dữ liệu và xuất bản các sự kiện trong một giao dịch duy nhất, chúng ta cần có khả năng xuất bản các tin nhắn đến MySQL. Vì chúng ta không muốn thay đổi trình xử lý tin nhắn để được hỗ trợ bởi MySQL trong toàn bộ hệ thống, chúng ta phải tìm cách khác để đạt được điều này.

Thông tin tốt là: Watermill cung cấp tất cả các công cụ cần thiết! Nếu bạn đang sử dụng cơ sở dữ liệu như MySQL, PostgreSQL (hoặc bất kỳ cơ sở dữ liệu SQL nào khác), Firestore hoặc Bolt, bạn có thể xuất bản các tin nhắn đến chúng. Thành phần Forwarder sẽ giúp bạn chọn tất cả các tin nhắn mà bạn xuất bản đến cơ sở dữ liệu và chuyển tiếp chúng đến trình xử lý tin nhắn của bạn.

Bạn cần đảm bảo rằng:

  1. Lệnh của bạn sử dụng một trình xuất bản hoạt động trong ngữ cảnh của một giao dịch cơ sở dữ liệu (ví dụ: SQL, Firestore, Bolt).
  2. Thành phần Forwarder đang hoạt động, sử dụng một trình đăng ký cơ sở dữ liệu và một trình xuất bản trình xử lý tin nhắn.

Trong trường hợp này, lệnh có thể trông như sau:

Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

Để kích hoạt thành phần Forwarder để hoạt động trong nền và chuyển tiếp các tin nhắn từ MySQL đến Google Pub/Sub, bạn cần thiết lập như sau:

Đối với mã nguồn đầy đủ, vui lòng tham khảo github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// Lưu trữ dữ liệu vào MySQL và xuất bản sự kiện đến Google Cloud Pub/Sub trong một giao dịch.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("Quay lui giao dịch do có lỗi", watermill.LogFields{"lỗi": err.Error()})
			// Trong trường hợp có lỗi, do lệnh quay lui giao dịch của MySQL, chúng ta có thể đảm bảo rằng các tình huống không mong muốn sau đây không xảy ra:
			// - Sự kiện được xuất bản nhưng dữ liệu không được lưu trữ,
			// - Dữ liệu được lưu trữ nhưng sự kiện không được xuất bản.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Tạo ra một trình xuất bản được trang trí bằng một phong bì được hiểu bởi thành phần forwarder.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// Xuất bản một thông báo về sự kiện kết thúc xổ số. Lưu ý rằng chúng ta xuất bản đến một chủ đề Google Cloud ở đây sử dụng trình xuất bản MySQL đã được trang trí.
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
Nếu bạn muốn tìm hiểu thêm về ví dụ này, vui lòng tìm thấy việc triển khai của nó [tại đây](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go).