اهمیت انتشار پیام‌ها در یک تراکنش

در برخورد با برنامه‌های مبتنی بر رویداد، گاهی اوقات ممکن است نیاز داشته باشید که وضعیت برنامه را پایدار نگه دارید و پیام‌ها را منتشر کنید تا بخش‌های دیگر سیستم را در جریان اتفاقات قرار دهید. در یک سناریو آرمانی، شما می‌خواهید وضعیت برنامه را پایدار نگه دارید و یک پیام را در یک تراکنش تکیه کن که در صورت عدم انجام این فعالیت‌ها، باعث مشکلات همگرایی داده می‌شود. برای همزمانی انجام داده‌های ذخیره‌سازی و انتشار رویدادها در یک تراکنش، شما باید بتوانید پیام‌ها را به همان پایگاه‌داده‌ای که برای ذخیره‌سازی داده استفاده می‌کنید، منتشر کنید یا یک مکانیزم دومیت‌باک (2PC) پیاده‌سازی کنید. اگر نمی‌خواهید واسط پیام را به پایگاه داده تغییر دهید و نمی‌خواهید چرخه را اختراع کنید، می‌توانید کار خود را با استفاده از اجزای فرستنده از Watermill، ساده‌تر کنید!

اجزای فرستنده

می‌توانید از فرستنده به عنوان یک دیمون پس‌زمینه فکر کنید که منتظر پیام‌ها برای انتشار در پایگاه داده است و اطمینان حاصل می‌کند که در نهایت آن‌ها به وسیله واسط پیام می‌رسد.

برای جعل فرستنده عمومی و شفاف برای استفاده، آن بر روی یک موضوع اختصاصی در پایگاه داده می‌شنود و از یک پخش‌کننده فرستنده تزئین شده استفاده می‌کند تا پیام‌های کپسوله‌شده را ارسال کند. فرستنده آن‌ها را باز کرده و به موضوع مقصد مشخص در واسط پیام ارسال می‌کند.

مثال

بیایید مثال زیر را در نظر بگیریم: یک دستور وجود دارد که مسئول انجام قرعه‌کشی است. باید به طور تصادفی یک کاربر ثبت‌نام شده در سیستم را به عنوان برنده انتخاب کند. همراه با انجام این کار، باید تصمیم خود را با ذخیره یک ورودی پایگاه داده که با شناسه یکتای قرعه‌کشی با شناسه کاربر انتخاب‌شده مرتبط است، ثابت کند. علاوه بر این، به عنوان یک سیستم مبتنی بر رویداد، باید همچنین یک رویداد قرعه‌کشی‌پایان‌یافته را منتشر کند، تا سایر اجزا بتوانند به‌شکل مناسب واکنش نشان دهند. برای دقت - یک جزء مسئول برای ارسال جوایز به برنده قرعه‌کشی وجود دارد. این جزء رویداد قرعه‌کشی‌پایان‌یافته را دریافت خواهد کرد و با استفاده از ورودی پایگاه داده با استفاده از شناسه یکتای قرعه‌کشی مطابقت خواهد داد و برنده را تعیین می‌کند.

در سناریو ما، پایگاه‌داده MySQL است و واسط پیام Google Pub/Sub است، اما ممکن است دو فناوری دیگر نیز باشد.

در هنگام پیاده‌سازی چنین دستوری، می‌توان به روش‌های مختلفی روی آورد. در ادامه، سه تلاش ممکن را معرفی خواهیم کرد و نقاط ضعف آن‌ها را مشخص خواهیم کرد.

ابتدا رویداد منتشر شود، سپس داده‌ها ذخیره شوند

در این رویکرد، دستور ابتدا یک رویداد را منتشر می‌کند و سپس داده‌ها را ذخیره می‌کند. اگرچه این روش بیشتر مواقع به درستی کار می‌کند، اما بیایید سعی کنیم مشکلات احتمالی را شناسایی کنیم.

دستور باید سه عملیات اصلی را انجام دهد:

  1. انتخاب یک کاربر تصادفی به نام A به عنوان برنده.
  2. انتشار رویداد LotteryConcluded برای اطلاع رسانی اتمام قرعه کشی B.
  3. ذخیره در پایگاه داده بر این که قرعه کشی B توسط کاربر A برنده شده است.

هر یک از این مراحل در معرض شکست قرار دارند که می‌تواند جریان دستور را به هم بزند. اگر مرحله اول شکست بخورد، پیامدها جدی نخواهد داشت - ما فقط باید یک خطا را برگردانی کرده و تصور کنیم که کل دستور به عنوان شکست شناخته شده است. هیچ داده‌ای ذخیره نخواهد شد و هیچ رویدادی منتشر نخواهد شد. ما می‌توانیم به سادگی دستور را دوباره اجرا کنیم.

اگر مرحله دوم شکست خورد، هنوز هیچ رویدادی منتشر نکرده‌ایم یا داده را در پایگاه داده ذخیره نکرده‌ایم. ما می‌توانیم دستور را دوباره اجرا کرده و دوباره امتحان کنیم.

جالب‌ترین سناریو آن است که اگر مرحله سوم شکست خورد چه اتفاقی می‌افتد. پس از مرحله دوم، ما قبلاً رویداد را منتشر کرده‌ایم، اما در نهایت هیچ داده‌ای در پایگاه داده ذخیره نخواهد شد. سایر اجزا چشمهای شان را خواهند کرد که قرعه کشی به اتمام رسیده است، اما هیچ برنده‌ای با شناسه قرعه کشی ارسال شده در رویداد مرتبط نخواهد بود. آن‌ها قادر به تأیید برنده نخواهند بود، بنابراین عملیات آن‌ها هم باید به عنوان شکست در نظر گرفته شود.

ما هنوز می‌توانیم از این وضعیت خارج شویم، اما ممکن است نیاز به برخی عملیات دستی داشته باشیم، مانند دوباره اجرای دستور با استفاده از شناسه قرعه کشی از رویدادی که قبلاً ارسال شده است.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. ابتدا رویداد را به Google Cloud Pub/Sub منتشر کرده و سپس داده‌ها را در MySQL ذخیره می‌کند.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}
	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// اگر خطا وجود دارد، رویداد ارسال شده اما داده هنوز ذخیره نشده است.
	if err = simulateError(); err != nil {
		logger.Error("Failed to persist data", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

ابتدا داده‌ها را ذخیره کنید، سپس رویداد را منتشر کنید

در رویکرد دوم، ما سعی می‌کنیم به نقاط ضعف روش اول پرداخته و اقدام به رفع آن‌ها کنیم. به منظور جلوگیری از نشتن وضعیت خطا به اجزای خارجی زمانی که وضعیت به درستی در پایگاه داده ذخیره نشده و رویدادها منتشر نشده‌اند، ما ترتیب اقدامات را به‌صورت زیر تغییر خواهیم داد:

  1. انتخاب تصادفی کاربر A به عنوان برنده قرعه‌کشی B.
  2. ذخیره اطلاعات برنده شدن کاربر A در قرعه‌کشی B در پایگاه داده.
  3. منتشر کردن رویداد LotteryConcluded که اعلام می‌کند قرعه‌کشی B به اتمام رسیده است.

مشابه رویکرد اول، اگر دو اقدام اول شکست بخورند، هیچ پیامدی نخواهیم داشت. در صورت شکست سومین اقدام، داده‌های ما در پایگاه داده ذخیره خواهد شد، اما هیچ رویدادی منتشر نخواهد شد. در این حالت، ما وضعیت خرابی را به اجزا خارج از قرعه‌کشی نشان نخواهیم داد. با این وجود، با توجه به رفتار مورد انتظار سیستم، برنده ما قادر به دریافت جایزه نخواهد بود زیرا هیچ رویدادی برای اجرای این عمل به مسئول مربوطه منتقل نشده است.

این مسئله همچنین می‌توانست با عملیات دستی حل شود، به عنوان مثال، به صورت دستی رویداد را منتشر کنیم. اما ما می‌توانیم بهتر عمل کنیم.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. ابتدا داده‌ها را به MySQL ذخیره کنید، سپس به‌صورت مستقیم رویداد را در Google Cloud Pub/Sub منتشر کنید.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

// اگر این اقدام شکست بخورد، داده‌های ما قبلاً ذخیره شده است، اما هیچ رویدادی منتشر نشده است.
	if err = simulateError(); err != nil {
		logger.Error("Failed to publish the event", err, nil)
		return err
	}

err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

ذخیره داده و انتشار رویدادها در یک تراکنش

تصور کنید که دستور ما می‌تواند وظایف دوم و سوم را به یکدیگر انجام دهد. آن‌ها به صورت اتمیک تایید می‌شوند، به این معنی که اگر یک وظیفه شکست بخورد، دیگری نمی‌تواند موفق شود. این می‌تواند با بهره‌گیری از مکانیسم تراکنشی که از پیش در بیشتر پایگاه‌داده‌ها پیاده‌سازی شده‌است، به دست آید. مثلاً MySQL که در مثال ما استفاده شده‌است، یکی از آن‌هاست.

برای ذخیره داده و انتشار رویدادها در یک تراکنش واحد، ما باید قادر به انتشار پیام‌ها به MySQL باشیم. چون ما نمی‌خواهیم تا جایی که سیستم کلی را پشتیبانی کند، پیکربندی ناقص پیام را تغییر دهیم تا توسط MySQL پشتیبانی شود، باید راه‌های دیگری برای رسیدن به این هدف پیدا کنیم.

اخبار خوب این است که Watermill تمام ابزارهای لازم را فراهم می‌کند! اگر از پایگاه‌داده‌ای مانند MySQL، PostgreSQL (یا هر پایگاه‌داده اِس‌کیوال دیگری)، Firestore یا Bolt استفاده می‌کنید، می‌توانید پیام‌ها را به آن‌ها ارسال کنید. کامپوننت Forwarder به شما کمک می‌کند تا تمام پیام‌هایی که شما به پایگاه‌داده منتشر می‌کنید را انتخاب کرده و آن‌ها را به پیکربندی‌کننده پیام منتقل کنید.

شما باید اطمینان حاصل کنید که:

  1. دستور شما از یک انتشارکننده کار می‌کند در زمینه تراکنش پایگاه‌داده (مثلاً SQL، Firestore، Bolt).
  2. کامپوننت Forwarder در حال اجراست و از اشتراک‌گیرنده پایگاه‌داده و انتشارکننده پیام‌ها استفاده می‌کند.

در این حالت، دستور ممکن است به این شکل باشد:

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

برای فعال کردن کامپوننت Forwarder برای کار در پس‌زمینه و فوروارد کردن پیام‌ها از MySQL به Google Pub/Sub، باید آن را به این صورت تنظیم کنید:

برای کد کامل، لطفا به github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go مراجعه کنید.

// ...
// ذخیره داده در MySQL و انتشار رویدادها به Google Cloud Pub/Sub در یک تراکنش.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("بازگردانی تراکنش به دلیل وجود خطا", watermill.LogFields{"error": err.Error()})
			// در صورت خطا، به دلیل بازگردانی تراکنش MySQL، می‌توانیم از وقوع سناریوهای ناخواسته زیر جلوگیری کنیم:
			// - رویداد منتشر شود اما داده ذخیره نشود،
			// - داده ذخیره شود اما رویداد منتشر نشود.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// دهنده انتشارکننده را با یک پوشش متوجه شده توسط کامپوننت فورواردر تزیین کنید.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// اعلان انتهایی قرعه‌کشی را منتشر کنید. لطفا توجه داشته باشید که ما از جانب انتشاردهنده MySQL دیکور شده به یک موضوع Google Cloud منتشر می‌کنیم.
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...
اگر می خواهید بیشتر در مورد این مثال بدانید، لطفاً پیاده‌سازی آن را [اینجا](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) پیدا کنید.