Bir İşlemde Mesaj Yayınlamanın Önemi
Olay tabanlı uygulamalarla çalışırken, uygulama durumunu kalıcı hale getirmeniz ve diğer sistem parçalarını hemen ne olduğu konusunda bilgilendirmek için mesajlar yayınlamanız gereken zamanlar olabilir. İdeal bir senaryoda, uygulama durumunu kalıcı hale getirmek ve bir mesajı tek bir işlem içinde yayınlamak istersiniz, çünkü bunu yapmamak veri tutarlılık sorunlarına yol açabilir. Veri depolama alanını aynı işlem içinde aynı veritabanına mesaj yayınlamak veya iki aşamalı taahhüt (2PC) mekanizmasını uygulamak için, aynı veritabanına mesaj yayınlamak gerekir veya mesaj yayınlamak için bir iki aşamalı taahhüt (2PC) mekanizması uygulamanız gerekir. Eğer mesaj aracını veritabanından ayrı tutmak istemiyorsanız ve tekerleği yeniden icat etmek istemiyorsanız, Watermill'in Forwarder bileşenini kullanarak işinizi kolaylaştırabilirsiniz!
Forwarder Bileşeni
Forwarder'ı arka planda bekleyen, veritabanına yayınlanacak mesajları bekleyen ve sonunda bunların mesaj aracına ulaşmasını sağlayan bir arka plan servisi gibi düşünebilirsiniz.
Forwarder'ı genel ve kullanımı şeffaf hale getirmek için, ara veritabanı üzerinde belirli bir konuda dinler ve kapsüllenmiş mesajları göndermek için bir süslenmiş Forwarder yayımcısını kullanır. Forwarder bunları açar ve belirtilen hedef konuya mesajları gönderir.
Örnek
Aşağıdaki örneği düşünelim: sistemde kayıtlı bir kullanıcıyı kazanan olarak rastgele seçmekle sorumlu bir komut bulunmaktadır. Bu işlemi yaparken aynı zamanda, benzersiz bir çekiliş kimliğini seçilen kullanıcının kimliği ile ilişkilendiren bir veritabanı girişi depolayarak kararını da kalıcı hale getirmesi gerekir. Ayrıca, olay tabanlı bir sistem olarak, diğer bileşenlerin uygun şekilde tepki gösterebilmesi için ÇekilişSonuçlandı
olayını yayınlamalıdır. Tam olarak - çekiliş kazananına ödül göndermekle sorumlu bir bileşen olacaktır. ÇekilişSonuçlandı
olayını alacak ve gömülü çekiliş kimliğini kullanarak veritabanı girişi ile doğrulayacak.
Bizim senaryomuzda, veritabanı MySQL ve mesaj aracı Google Pub/Sub, ancak başka iki teknoloji de olabilir.
Bu tür bir komutu uygularken çeşitli yaklaşımlar benimsenebilir. Aşağıda, üç olası deneme sunacağız ve kusurlarını belirteceğiz.
Önce Etkinlik Yayınlayın, Sonra Verileri Saklayın
Bu yaklaşımda, komut önce bir etkinlik yayınlar ve ardından verileri saklar. Bu yöntem çoğu durumda doğru çalışabilir, ancak olası sorunları belirlemeye çalışalım.
Komutun üç temel işlemi gerçekleştirmesi gerekmektedir:
- Rastgele bir kullanıcı
A
'yı kazanan olarak seçin. -
LotteryConcluded
etkinliğini yayınlayarakB
lotosunun bittiğini bildirin. - Veritabanına
B
lotosunun kullanıcıA
tarafından kazanıldığını kaydedin.
Her adım arıza riskine sahiptir, bu da komut akışımızı bozabilir. İlk adım başarısız olursa, sonuçlar ciddi olmayacak - sadece bir hata döndürmemiz ve tüm komutu başarısız olarak düşünmemiz gerekecek. Hiçbir veri saklanmayacak ve hiçbir etkinlik yayınlanmayacak. Sadece komutu tekrar çalıştırabiliriz.
Eğer ikinci adım başarısız olursa, henüz etkinliği yayınlamamış veya verileri veritabanına kaydetmemiş oluruz. Komutu tekrar çalıştırabilir ve yeniden deneyebiliriz.
En ilginç senaryo üçüncü adımın başarısız olması durumudur. İkinci adımdan sonra zaten etkinliği yayınlamış olacağız, ancak sonunda veri veritabanına kaydedilmeyecek. Diğer bileşenler, lotonun bittiğine dair sinyali alacak, ancak etkinlikte gönderilen lotosuyla ilişkilendirilmiş bir kazanan olmayacak. Kazananı doğrulayamayacakları için işlemleri de başarısız olarak kabul etmelidir.
Bu durumdan hala kurtulabiliriz, ancak etkinlikten gelen lotosu kullanarak komutu tekrar çalıştırmak gibi bazı manuel işlemlere ihtiyaç duyabiliriz.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 1. Önce Google Cloud Pub/Sub'a etkinliği yayınlayın, sonra verileri MySQL'e kaydedin.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// Eğer bir hata varsa, etkinlik gönderilmiş ancak veri henüz kaydedilmemiş demektir.
if err = simulateError(); err != nil {
logger.Error("Veri saklanamadı", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
Verileri önce depolayın, sonra etkinliği yayınlayın
İkinci yaklaşımda, önceki adres yönteminin eksikliklerini gidermeye çalışacağız. Veritabanında durum doğru bir şekilde kalıcı olarak depolanmadığında ve etkinlikler yayınlanmadığında dış bileşenlere başarısızlık durumunun sızmasını önlemek için, aksiyonların sırasını aşağıdaki gibi değiştireceğiz:
- Kullanıcı
A
'yı rastgeleB
çekilişinin kazananı olarak seçin. - Veritabanında
B
çekilişinin kullanıcıA
tarafından kazanıldığı bilgisini kalıcı hale getirin. -
B
çekilişinin sona erdiğini bildirenLotteryConcluded
etkinliğini yayınlayın.
İlk yaklaşıma benzer şekilde, ilk iki aksiyon başarısız olursa, hiçbir sonucumuz olmaz. Üçüncü aksiyonun başarısız olması durumunda, verilerimiz veritabanında kalıcı hale getirilir, ancak hiçbir etkinlik yayınlanmaz. Bu durumda, çekiliş dışındaki bileşenlere başarısızlık durumunu sızdırmayacağız. Ancak, beklenen sistem davranışını göz önünde bulundurarak, kazananımız bu işlem için sorumlu bileşene etkinlik iletilmediği için ödülü alamayacaktır.
Bu sorun aynı zamanda manuel bir işlemle çözülebilir, yani etkinliğin manuel olarak yayınlanması. Ancak, daha iyisini yapabiliriz.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 2. Önce verileri MySQL'e kalıcı olarak depolayın, ardından etkinliği doğrudan Google Cloud Pub/Sub'a yayınlayın.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// Eğer bu başarısız olursa, verilerimiz zaten depolanmış olacak, ancak hiçbir etkinlik yayınlanmayacak.
if err = simulateError(); err != nil {
logger.Error("Etkinliği yayınlama başarısız oldu", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
Veri Depolama ve Bir İşlemde Olay Yayınlama
Hayal edin ki komutumuz aynı anda ikinci ve üçüncü görevleri gerçekleştirebilsin. Bunlar atomik bir şekilde işlenecek, yani eğer bir görev başarısız olursa diğeri başarılı olamaz. Bu, çoğu veritabanında zaten uygulanan işlem mekanizmasından yararlanılarak elde edilebilir. Örneğimizde kullandığımız MySQL de bunlardan biridir.
Veri depolamak ve olayları tek bir işlem içinde yayınlamak için, MySQL'e mesaj yayınlamamız gerekiyor. Mesaj broker'ını tüm sisteme yayınlamak üzere MySQL tarafından desteklenmesini istemediğimizden, bunu gerçekleştirmek için başka yollar bulmalıyız.
İyi haber şu ki: Watermill, gerekli tüm araçları sağlar! Eğer MySQL, PostgreSQL (veya herhangi bir SQL veritabanı), Firestore veya Bolt gibi bir veritabanı kullanıyorsanız, bu veritabanlarına mesaj yayınlayabilirsiniz. Forwarder bileşeni, veritabanına yayınladığınız tüm mesajları seçmenize ve bunları mesaj broker'ınıza iletmek için size yardımcı olacaktır.
Şunları sağlamalısınız:
- Komutunuz bir veritabanı işlem bağlamında çalışan bir yayıncı kullanmalıdır (örneğin SQL, Firestore, Bolt).
- Forwarder bileşeni, bir veritabanı abonesi ve bir mesaj brokerı yayıncısı kullanarak çalışıyor olmalıdır.
Bu durumda, komut aşağıdaki gibi görünebilir:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
Forwarder bileşeninin MySQL'den Google Pub/Sub'a mesajları iletmek üzere arka planda çalışmasını etkinleştirmek için aşağıdaki gibi ayarlamalar yapmanız gerekmektedir:
Tam kod için lütfen şuraya bakınız: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// Veritabanına veri kaydet ve Google Cloud Pub/Sub'a olay yayınla işlemi.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("Olaylardan biri gerçekleşmediği için işlem geri alınıyor", watermill.LogFields{"hata": err.Error()})
// Hata durumunda MySQL işlemi geri alındığında aşağıdaki istenmeyen senaryoların gerçekleşmemesini sağlayabiliriz:
// - Olay yayınlandı ancak veri kaydedilmedi,
// - Veri kaydedildi ancak olay yayınlanmadı.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Yayıncıyı, **Forwarder** bileşeni tarafından anlaşılabilen bir zarf ile süsleyin.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// Bir çekiliş sonucu etkinliği duyurusu yayınlayın. Lütfen burada MySQL yayıncısını kullanarak Google Cloud konusuna yayınladığımıza dikkat edin.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
Bu örneği daha fazla öğrenmek istiyorsanız, uygulamasını [buradan](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go) bulabilirsiniz.