CQRS Mekanizması
CQRS, "Command Query Responsibility Segregation" teriminin kısaltmasıdır. Bu, komut (yazma istekleri) ve sorgu (okuma istekleri) sorumluluğunu ayırır. Yazma istekleri ve okuma istekleri farklı nesneler tarafından ele alınır.
Bu, CQRS'dir. Veri depolama, ayrı okuma ve yazma depolamalarına sahip olarak daha da ayrılabilir. Bu yapıldıktan sonra, farklı türde sorguları işlemek için optimize edilmiş birçok okuma depolaması veya birçok sınırlı bağlamı kapsayabilir. Ayrı okuma/yazma depolaması genellikle CQRS ile ilgili tartışma konusudur, ancak CQRS kendisi değildir. CQRS sadece komut ve sorgunun ilk ayrılmasıdır.
cqrs
bileşeni, CQRS desenini uygulamaya yardımcı olmak için Pub/Sub ve Router üzerine inşa edilmiş bazı faydalı soyutlamalar sunar.
Tüm CQRS'yi uygulamanız gerekmez. Genellikle, bileşenin yalnızca olay kısmı, olay odaklı uygulamalar oluşturmak için kullanılır.
Yapı Taşları
Olaylar
Olaylar, zaten gerçekleşmiş bir şeyi temsil eder. Olaylar değiştirilemez.
Olay Otobüsü
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus, olayları olay işleyicilere taşır.
type EventBus struct {
// ...
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic, olayları yayınlama konusu adını oluşturmak için kullanılır.
GeneratePublishTopic GenerateEventPublishTopicFn
// Yayınlamadan önce çağrılan OnPublish, *message.Message'ı değiştirebilir.
//
// Bu seçenek zorunlu değildir.
OnPublish OnEventSendFn
// Kodlayıcı ve kod çözücü olarak kullanılan Marshaler.
// Bu zorunludur.
Marshaler CommandEventMarshaler
// Günlükleme için Logger örneği. Sağlanmazsa, watermill.NopLogger kullanılır.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Olay İşleyici
Tam kod: github.com/ThreeDotsLabs/su_değirmeni/bileşenler/cqrs/olay_işleyici.go
// ...
// Olayİşleyici, olay veri otobüsünden alınan olayları işleyecek olan Olayİşleyici'yi belirlemek için kullanılır.
type Olayİşleyici struct {
// ...
Tam kod: github.com/ThreeDotsLabs/su_değirmeni/bileşenler/cqrs/olay_işleyici.go
// ...
type OlayİşleyiciYapılandırma struct {
// GenerateAboneOlKonu, olaylara abone olmak için konu oluşturmak için kullanılır.
// Eğer olay işleyici, işleyici grupları kullanıyorsa, GenerateAboneOlKonu kullanılır.
GenerateAboneOlKonu OlayİşleyiciGenerateAboneOlKonuFn
// AboneOluşturucu, Olayİşleyici için bir abonenin oluşturulması için kullanılır.
//
// Bu işlev, her BirOlayİşleyici örneği için bir kez çağrılır.
// Birden fazla işleyici için bir aboneyi yeniden kullanmak istiyorsanız, GroupOlayİşleyici kullanın.
AboneOluşturucu OlayİşleyiciAboneOluşturucuFn
// HandleÖncesinde, olay işlemeden önce çağrılır.
// HandleÖncesinde, ara yazılım gibi çalışır: olay işlemeden önce ve sonra ek mantık enjekte edebilirsiniz.
//
// Bu nedenle, params.Handler.Handle() 'ı açıkça çağırmanız gerekmektedir olayı işlemek için.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // İşlemeden önce mantık
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // İşlemden sonra mantık
// // (...)
// return err
// }
//
// Bu seçenek zorunlu değildir.
HandleÖncesinde OlayİşleyiciHandleÖncesindeFn
// TanımsızOlaydaGeçerleme, olayın tanımlı bir işleyiciye sahip olmadığında mesajın kabul edilip edilmeyeceğini belirlemek için kullanılır.
TanımsızOlaydaGeçerleme bool
// Marşaller, olayları marşalleme ve serbest bırakma işlemek için kullanılır.
// Gerekli.
Marşaller KomutOlayMarşallayıcı
// Günlükçü örneği için günlükleme.
// Sağlanmadıysa, watermill.NopGünlükçü kullanılacaktır.
Günlükçü watermill.GünlükçüAdaptörü
// disableRouterAutoAddHandlers, geriye dönük uyumluluğu korumak içindir.
// Bu değer, NewOlayİşleyici kullanılarak Olayİşleyici oluşturulurken ayarlanır.
// Artık kullanılmayan: NewOlayİşleyiciWithConfig'a göç edin.
disableRouterAutoAddHandlers bool
}
func (c *OlayİşleyiciYapılandırma) varsayılanlarıAyarla() {
if c.Günlükçü == nil {
c.Günlükçü = watermill.NopGünlükçü{}
}
}
// ...
Olay Grubu İşleyicisi
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor, olay veri yolu tarafından alınan olayları hangi olay işleyicisinin işlemesi gerektiğini belirler.
// EventProcessor'den farklı olarak, EventGroupProcessor aynı abone örneğini paylaşan birden fazla işleyiciye izin verir.
type EventGroupProcessor struct {
// ...
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic, grup olay işleyicilerine abone olmak için konuyu oluşturmak için kullanılır.
// İşlemci grupları kullanıldığında, bu seçenek gereklidir.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor, GroupEventHandler için bir abone oluşturmak için kullanılır.
// Bu işlev her olay grubu için bir aboneliğin oluşturulmasına izin vererek bir kez çağrılır.
// Bir akıştan olayları sırayla işlemek istediğimizde çok faydalıdır.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle, olayı işlemekten önce çağrılır.
// OnHandle, bir ara yazılım gibi: olayı işlemeden önce ve sonra ek mantık ekleyebilirsiniz.
//
// Bu nedenle, params.Handler.Handle() 'ı açıkça çağırmanız gerekmektedir.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // İşlemeden önce mantık
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // İşlemden sonra mantık
// // (...)
//
// return err
// }
//
// Bu seçenek gereklidir.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent, olayın tanımlı bir işleyiciye sahip olmaması durumunda bilinmeyen olayı kabul edip etmeyeceğini belirlemek için kullanılır.
AckOnUnknownEvent bool
// Marshaler, olayların kodlaması ve kod çözme için kullanılır.
// Bu gereklidir.
Marshaler CommandEventMarshaler
// Günlükçü örneği için kullanılan günlük.
// Sağlanmazsa, watermill.NopLogger kullanılacaktır.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Olay Grubu İşleyici hakkında daha fazla bilgi edinin.
Olay İşleyici
Tam Kaynak Kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler, YeniOlay tarafından tanımlanan olayları alır ve bunları Handle yöntemiyle işler.
// DDD kullanılıyorsa, olay işleyicisi toplamları değiştirebilir ve kalıcı hale getirebilir.
// Ayrıca işlem yöneticilerini, sage'leri veya sadece okuma modellerini oluşturabilir.
//
// Komut işleyicilerinin aksine, her olayın birden çok olay işleyicisi olabilir.
//
// Mesaj işleme sırasında, bir EventHandler örneği kullanın.
// Birden fazla olayı aynı anda geçirirken, Handle yöntemi birden çok kez aynı anda yürütülebilir.
// Bu nedenle, Handle yönteminin thread-safe olması gerekir!
type EventHandler interface {
// ...
Komut
Bir komut, belirli bir işlemi gerçekleştirmek için bir isteği temsil eden basit bir veri yapısıdır.
Komut Otobüsü
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus, komutları komut işleyicilere taşıyan bileşendir.
type CommandBus struct {
// ...
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic, komutları yayınlamak için konu oluşturmak için kullanılır.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend, bir komut yayınlamadan önce çağrılır.
// *message.Message değiştirilebilir.
//
// Bu seçenek zorunlu değildir.
OnSend CommandBusOnSendFn
// Marshaler, komutların seri hale getirilmesi ve serileştirilmesi için kullanılır.
// Gerekli.
Marshaler CommandEventMarshaler
// Günlükçü örneği için kullanılan günlükçü.
// Sağlanmazsa, watermill.NopLogger kullanılacaktır.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Komut İşleyici
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn, CommandHandler için abone oluşturmak için kullanılır.
// Her komut işleyici için ayrı özel abone oluşturmanıza olanak tanır.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic, komutlara abone olmak için konu oluşturmak için kullanılır.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor, CommandHandler için abone oluşturmak için kullanılır.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle, komut işlenmeden önce çağrılır.
// OnHandle, bir ara yazılım gibi çalışır: komut işlenmeden önce ve sonra ek mantık enjekte edebilirsiniz.
//
// Bu nedenle, komut işlemek için params.Handler.Handle() açıkça çağrılmalıdır.
// func(params CommandProcessorOnHandleParams) (err error) {
// // işlem öncesi mantık
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // işlem sonrası mantık
// // (...)
//
// return err
// }
//
// Bu seçenek zorunlu değildir.
OnHandle CommandProcessorOnHandleFn
// Marshaler, komutların seri hale getirilmesi ve serileştirilmesi için kullanılır.
// Gerekli.
Marshaler CommandEventMarshaler
// Günlükçü örneği için kullanılan günlükçü.
// Sağlanmazsa, watermill.NopLogger kullanılacaktır.
Logger watermill.LoggerAdapter
// Eğer true ise, CommandHandler hata döndürse bile CommandProcessor mesajları kabul edecektir.
// RequestReplyBackend null değilse ve yanıt gönderme başarısız olursa, mesaj yine de reddedilecektir.
//
// Uyarı: Bu seçeneği kullanmanız önerilmez (requestreply.NewCommandHandler veya requestreply.NewCommandHandlerWithResult ile kullanırken),
// çünkü yanıt gönderme başarısız olduğunda komutu kabul edebilir.
//
// requestreply kullanırken, requestreply.PubSubBackendConfig.AckCommandErrors kullanmalısınız.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers, geriye dönük uyumluluk için kullanılır.
// NewCommandProcessor ile bir CommandProcessor oluşturulurken ayarlanır.
// Not: Lütfen NewCommandProcessorWithConfig'e geçiş yapın.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Komut İşleyici
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler, NewCommand tarafından tanımlanan komutu alır ve Handle yöntemini kullanarak işler.
// DDD kullanılıyorsa, CommandHandler, birlikleri değiştirebilir ve kalıcı hale getirebilir.
//
// EventHandler'dan farklı olarak, her Komutun yalnızca bir CommandHandler'ı olabilir.
//
// Mesaj işleme sırasında, bir CommandHandler örneği kullanın.
// Eş zamanlı olarak birden fazla komut iletiliyorsa, Handle yöntemi birden fazla kez eş zamanlı olarak yürütülebilir.
// Bu nedenle, Handle yöntemi thread-safe olmalıdır!
type CommandHandler interface {
// ...
Komut ve Olay Yazarı
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler, komutları ve olayları Watermill mesajlarına ve bunun tersine dönüştürür.
// Komutun payload'ı []byte'a dönüştürülmelidir.
type CommandEventMarshaler interface {
// Marshal, komutu veya olayı Watermill mesajına dönüştürür.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal, Watermill mesajını v komutu veya olayına çözümler.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name, komutun veya olayın adını döndürür.
// Alınan komutun veya olayın işlememiz istediğimiz olanı olup olmadığını belirlemek için ad kullanılabilir.
Name(v interface{}) string
// NameFromMessage, Komutları veya olayları Watermill mesajlarına dönüştürdüğümüzde, gereksiz çözümlemeden kaçınmak için Ad yerine NameFromMessage kullanmalıyız.
NameFromMessage(msg *message.Message) string
}
// ...
Kullanım
Örnek Alan
Otelde oda rezervasyonlarıyla ilgilenen basit bir alan kullanılır.
Bu domain modelini göstermek için Event Storming sembolleri kullanacağız.
Sembol açıklaması:
- Mavi yapışkan notlar komutları temsil eder.
- Turuncu yapışkan notlar olayları temsil eder.
- Yeşil yapışkan notlar olaylardan asenkron olarak oluşturulan okuma modellerini temsil eder.
- Mor yapışkan notlar olaylar tarafından tetiklenen ve komutlar üreten politikaları temsil eder.
- Pembe yapışkan notlar sıkça karşılaşılan sorunlu alanları işaretler.
Alan basittir:
- Müşteriler, odaları rezerve edebilir.
-
Her bir oda rezerve edildiğinde, müşteri için bir şişe bira siparişi veririz (çünkü misafirlerimizi seviyoruz).
- Bazen biranın tükendiğini biliyoruz.
- Rezervasyona dayalı finansal raporlar üretiyoruz.
Komut Gönderme
Öncelikle, müşteri eylemlerini simüle etmemiz gerekiyor.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Komut İşleyici
BookRoomHandler
, komutlarımızı işleyecek.
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler, BookRoom komutunu işleyen ve RoomBooked etkinliği yayınlayan bir komut işleyicisidir.
//
// CQRS'de bir komut bir işleyici tarafından işlenmelidir.
// Bu komutu işlemek için başka bir işleyici eklerken bir hata dönecektir.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand, bu işleyicinin işlemesi gereken komutun türünü döndürür. Kesinlikle bir işaretçi olmalıdır.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c her zaman `NewCommand` tarafından döndürülen türdür, bu nedenle tür güvencesi her zaman güvenlidir
cmd := c.(*BookRoom)
// Gerçek üretimde daha mantıklı bir şekilde hesaplanabilecek rastgele fiyatlandırma
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"%s odası, %s tarafından %s tarihinden %s tarihine kadar rezerve edildi",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked, OrderBeerOnRoomBooked etkinlik işleyicisi tarafından işlenecek,
// ve ileride RoomBooked, birden çok etkinlik işleyicisi tarafından işlenebilir
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked, RoomBooked etkinliğini işleyen ve OrderBeer komutunu yayınlayan bir etkinlik işleyicisidir.
// ...
Etkinlik İşleyicileri
Daha önce belirtildiği gibi, her bir oda rezerve edildiğinde bir şişe bira sipariş etmek istiyoruz ("Oda rezerve edildiğinde" etiketiyle işaretlenmiştir). Bunu OrderBeer
komutunu kullanarak gerçekleştiriyoruz.
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked, RoomBooked etkinliğini işleyen ve OrderBeer komutunu yayınlayan bir etkinlik işleyicisidir.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// Bu isim, kuyruk adını oluşturmak için EventsSubscriberConstructor'a iletilir
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler, BookRoomHandler'a çok benzer. Tek farkı, bazen yeterli bira olmadığında bir hata döndürmesi ve komutun yeniden gönderilmesine neden olmasıdır. Tam uygulamayı [örnek kaynak kodunda](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc) bulabilirsiniz.
Olay İşleyici Grupları
Varsayılan olarak, her olay işleyicisinin ayrı bir abone örneği vardır. Bu yaklaşım, yalnızca bir olay türünün konuya gönderilmesi durumunda sorunsuz çalışır.
Konuda birden fazla olay türü bulunduğunda, iki seçeneğiniz vardır:
-
EventConfig.AckOnUnknownEvent
'itrue
olarak ayarlayabilirsiniz - bu, işleyiciler tarafından işlenmeyen tüm olayları kabul eder. - Olay işleyici grup mekanizmasını kullanabilirsiniz.
Olay gruplarını kullanabilmek için EventConfig
içinde GenerateHandlerGroupSubscribeTopic
ve GroupSubscriberConstructor
seçeneklerini ayarlamanız gerekmektedir.
Ardından, EventProcessor
üzerinde AddHandlersGroup
'u kullanabilirsiniz.
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"olaylar",
OdaRezerveEdildiğindeBiraSiparişiVer{komutBus},
YeniRezervasyonFinansalRapor(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BiraSiparişiVerildi) error {
logger.Info("Bira siparişi verildi", watermill.LogFields{
"oda_id": event.OdaId,
})
return nil
}),
)
if err != nil {
// ...
GenerateHandlerGroupSubscribeTopic
ve GroupSubscriberConstructor
her ikisi de grup adı hakkında bilgi alır ve bu bilgileri işlev parametreleri olarak alır.
Genel İşleyiciler
Watermill v1.3'ten itibaren, genel işleyiciler komutları ve olayları işlemek için kullanılabilir. Bu, çok sayıda komut/olayınız olduğunda ve her biri için bir işleyici oluşturmak istemediğinizde oldukça kullanışlıdır.
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BiraSiparişiVerildi) error {
logger.Info("Bira siparişi verildi", watermill.LogFields{
"oda_id": event.OdaId,
})
return nil
}),
// ...
Arka planda, bir EventHandler veya CommandHandler uygulaması oluşturur. Tüm işleyici türleri için uygundur.
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler, sağlanan işlev ve işlev parametrelerinden çıkarılan komut türüne dayalı yeni bir CommandHandler uygulaması oluşturur.
func NewCommandHandler[Komut any](
// ...
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler, sağlanan işlev ve işlev parametrelerinden çıkarılan olay türüne dayalı yeni bir EventHandler uygulaması oluşturur.
func NewEventHandler[T any](
// ...
Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler, sağlanan işlev ve işlev parametrelerinden çıkarılan olay türüne dayalı yeni bir GroupEventHandler uygulaması oluşturur.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Olay İşleyicilerini Kullanarak Bir Okuma Modeli Oluşturma
Tam kaynak kod: github.com/ThreeDotsLabs/su-değirmeni/_örnekler/temel/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport, rezervasyonlardan ne kadar para kazanabileceğimizi hesaplayan bir okuma modelidir.
// Oluştuğunda RoomBooked olaylarını dinler.
//
// Bu uygulama basitçe belleğe yazma yapar. Üretim ortamında, bazı kalıcı depolama yöntemlerini kullanabilirsiniz.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// Bu isim EventsSubscriberConstructor'a iletilir ve kuyruk adı üretmek için kullanılır
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle aynı anda birden çok kez çağrılabilir, bu yüzden iş parçacığı güvenliği gereklidir.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// Tam olarak bir kez teslimat semantiği sağlamayan Pub/Sub kullanıyorsak, mesajları özdeşleştirmemiz gerekir.
// GoChannel Pub/Sub tam olarak bir kez teslimat sağlar,
// ancak bu örneği diğer Pub/Sub uygulamaları için hazırlayalım.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> $%d karşılığında oda rezerve edildi\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Her Şeyi Bağlayın
Zaten CQRS uygulaması oluşturmak için gerekli tüm bileşenlere sahibiz.
Mesaj aracı olarak AMQP (RabbitMQ) kullanacağız: AMQP.
CQRS'nin altında, Watermill'in mesaj yönlendiricisini kullanır. Bu konuda bilgi sahibi değilseniz ve nasıl çalıştığını anlamak istiyorsanız, başlangıç kılavuzuna göz atmalısınız. Ayrıca standart mesajlaşma modelleri olan metrikler, zehirli mesaj kuyrukları, hız sınırlama, korelasyon ve her mesaj tabanlı uygulama tarafından kullanılan diğer araçları nasıl kullanacağınızı da gösterecektir. Bu araçlar zaten Watermill'in içine yerleştirilmiştir.
Şimdi CQRS'ye geri dönelim. Bildiğiniz gibi, CQRS komut veya olay otobüsleri, işlemciler ve benzeri gibi birden fazla bileşenden oluşur.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
Bu kadar. Çalıştırılabilir bir CQRS uygulamamız var.