Mekanisme CQRS

CQRS adalah singkatan dari "Command Query Responsibility Segregation". Ini memisahkan tanggung jawab perintah (permintaan tulis) dan kueri (permintaan baca). Permintaan tulis dan permintaan baca ditangani oleh objek yang berbeda.

Inilah CQRS. Kita dapat memisahkan penyimpanan data, memiliki penyimpanan baca dan tulis yang terpisah. Setelah ini dilakukan, mungkin ada banyak penyimpanan baca yang dioptimalkan untuk menangani berbagai jenis kueri atau merentang banyak konteks terikat. Meskipun penyimpanan baca/tulis yang terpisah sering menjadi topik diskusi terkait dengan CQRS, itu bukanlah CQRS itu sendiri. CQRS hanyalah pemisahan pertama dari perintah dan kueri.

Diagram Arsitektur CQRS

Komponen cqrs menyediakan beberapa abstraksi yang berguna, dibangun di atas Pub/Sub dan Router, untuk membantu menerapkan pola CQRS.

Anda tidak perlu menerapkan seluruh CQRS. Biasanya, hanya bagian acara dari komponen ini digunakan untuk membangun aplikasi yang didorong oleh acara.

Blok Bangunan

Acara

Acara mewakili sesuatu yang telah terjadi. Acara bersifat tidak berubah.

Bus Acara

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus mengangkut acara ke penangan acara.
type EventBus struct {
// ...

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic digunakan untuk menghasilkan nama topik untuk memublikasikan acara.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish dipanggil sebelum mengirimkan acara. Ini dapat memodifikasi *message.Message.
    //
    // Opsi ini tidak wajib.
    OnPublish OnEventSendFn

    // Marshaler digunakan untuk mengkode dan mendekode acara.
    // Ini wajib.
    Marshaler CommandEventMarshaler

    // Contoh logger untuk logging. Jika tidak disediakan, watermill.NopLogger digunakan.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Pemroses Peristiwa

Kode lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor digunakan untuk menentukan EventHandler yang harus menangani peristiwa yang diterima dari bus peristiwa.
type EventProcessor struct {
// ...

Kode lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic digunakan untuk menghasilkan topik untuk berlangganan peristiwa.
	// Jika pemroses peristiwa menggunakan grup handler, maka GenerateSubscribeTopic digunakan.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor digunakan untuk membuat seorang pelanggan untuk EventHandler.
	// Fungsi ini dipanggil sekali untuk setiap instance EventHandler.
	// Jika Anda ingin menggunakan kembali seorang pelanggan untuk beberapa handler, gunakan GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle dipanggil sebelum menangani peristiwa.
	// OnHandle bekerja mirip dengan middleware: Anda dapat menyisipkan logika tambahan sebelum dan setelah menangani peristiwa.
	//
	// Oleh karena itu, Anda perlu secara eksplisit memanggil params.Handler.Handle() untuk menangani peristiwa.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // Logika sebelum penanganan
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // Logika setelah penanganan
	//      //  (...)

	//      return err
	//  }
	//
	// Opsi ini tidak wajib.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent digunakan untuk menentukan apakah pesan seharusnya diakui ketika peristiwa tidak memiliki handler yang ditentukan.
	AckOnUnknownEvent bool

	// Marshaler digunakan untuk marshaling dan unmarshaling peristiwa.
	// Diperlukan.
	Marshaler CommandEventMarshaler

	// Instance Logger untuk logging.
	// Jika tidak disediakan, watermill.NopLogger akan digunakan.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers adalah untuk mempertahankan kompatibilitas mundur.
	// Nilai ini akan diatur saat membuat EventProcessor menggunakan NewEventProcessor.
	// Diperbolehkan: bermigrasi ke NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Pemroses Kelompok Acara

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor menentukan processor acara mana yang harus menangani acara yang diterima dari bus acara.
// Dibandingkan dengan EventProcessor, EventGroupProcessor memungkinkan beberapa processor untuk berbagi instance pelanggan yang sama.
type EventGroupProcessor struct {
// ...

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic digunakan untuk menghasilkan topik untuk berlangganan processor acara grup.
	// Opsi ini diperlukan untuk EventProcessor ketika menggunakan grup processor.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor digunakan untuk membuat pelanggan untuk GroupEventHandler.
	// Fungsi ini dipanggil sekali per grup acara - memungkinkan langganan dibuat untuk setiap grup.
	// Sangat berguna ketika kita ingin menangani acara dari stream secara berurutan.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle dipanggil sebelum menangani acara.
	// OnHandle mirip dengan middleware: Anda dapat menyisipkan logika tambahan sebelum dan setelah menangani acara.
	//
	// Oleh karena itu, Anda perlu secara eksplisit memanggil params.Handler.Handle() untuk menangani acara.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // Logika sebelum penanganan
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // Logika setelah penanganan
	//     //  (...)
	//
	//     return err
	// }
	//
	// Opsi ini tidak diperlukan.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent digunakan untuk menentukan apakah akan mengakui jika acara tidak memiliki penangan yang ditentukan.
	AckOnUnknownEvent bool

	// Marshaler digunakan untuk encoding dan decoding acara.
	// Ini diperlukan.
	Marshaler CommandEventMarshaler

	// Instansi Logger digunakan untuk logging.
	// Jika tidak disediakan, watermill.NopLogger akan digunakan.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Pelajari lebih lanjut tentang Pemroses Kelompok Acara.

Penangan Acara

Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler menerima acara yang ditentukan oleh NewEvent dan menanganinya menggunakan metodenya, Handle.
// Jika menggunakan DDD, penangan acara dapat memodifikasi dan menyimpan agregat.
// Ini juga dapat memanggil manajer proses, saga, atau hanya membangun model baca.
//
// Berbeda dengan penangan perintah, setiap acara dapat memiliki beberapa penangan acara.
//
// Selama penanganan pesan, gunakan satu instance EventHandler.
// Saat melewati beberapa acara secara bersamaan, metode Handle dapat dieksekusi beberapa kali secara bersamaan.
// Oleh karena itu, metode Handle perlu aman untuk digunakan secara bersamaan!
type EventHandler interface {
// ...

Perintah

Sebuah perintah adalah struktur data sederhana yang mewakili permintaan untuk melakukan suatu operasi.

Command Bus

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus adalah komponen yang mengangkut perintah ke penangan perintah.
type CommandBus struct {
// ...

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic digunakan untuk menghasilkan topik untuk memublikasikan perintah.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend dipanggil sebelum memublikasikan sebuah perintah.
	// *message.Message dapat dimodifikasi.
	//
	// Opsi ini tidak wajib.
	OnSend CommandBusOnSendFn

	// Marshaler digunakan untuk men-serialize dan de-serialize perintah.
	// Diperlukan.
	Marshaler CommandEventMarshaler

	// Logger instansi yang digunakan untuk logging.
	// Jika tidak disediakan, watermill.NopLogger akan digunakan.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Command Processor

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn digunakan untuk membuat subscriber untuk CommandHandler.
// Ini memungkinkan Anda untuk membuat subscriber kustom terpisah untuk setiap penangan perintah.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic digunakan untuk menghasilkan topik untuk berlangganan perintah.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor digunakan untuk membuat subscriber untuk CommandHandler.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle dipanggil sebelum menangani perintah.
	// OnHandle berfungsi seperti middleware: Anda dapat menyisipkan logika tambahan sebelum dan sesudah menangani perintah.
	//
	// Karena ini, Anda perlu memanggil params.Handler.Handle() secara eksplisit untuk menangani perintah.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // logika sebelum penanganan
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // logika setelah penanganan
	//      // (...)
	//
	//      return err
	//  }
	//
	// Opsi ini tidak wajib.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler digunakan untuk serialisasi dan deserialisasi perintah.
	// Diperlukan.
	Marshaler CommandEventMarshaler

	// Logger instansi untuk logging.
	// Jika tidak disediakan, watermill.NopLogger akan digunakan.
	Logger watermill.LoggerAdapter

	// Jika true, CommandProcessor akan mengakui pesan bahkan jika CommandHandler mengembalikan kesalahan.
	// Jika RequestReplyBackend tidak null dan pengiriman balasan gagal, pesan masih akan ditolak.
	//
	// Peringatan: Tidak disarankan menggunakan opsi ini ketika menggunakan komponen requestreply (requestreply.NewCommandHandler atau requestreply.NewCommandHandlerWithResult),
	// karena dapat mengakui perintah ketika pengiriman balasan gagal.
	//
	// Saat menggunakan requestreply, Anda harus menggunakan requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers digunakan untuk kompatibilitas mundur.
	// Ini diatur ketika membuat CommandProcessor dengan NewCommandProcessor.
	// Didepresiasi: harap beralih ke NewCommandProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Pemroses Perintah

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler menerima perintah yang didefinisikan oleh NewCommand dan mengaturnya menggunakan metode Handle.
// Jika menggunakan DDD, CommandHandler dapat memodifikasi dan menyimpan agregat.
//
// Berbeda dengan EventHandler, setiap Command hanya dapat memiliki satu CommandHandler.
//
// Selama penanganan pesan, gunakan satu instansi dari CommandHandler.
// Ketika beberapa perintah dikirim secara bersamaan, metode Handle dapat dieksekusi beberapa kali secara bersamaan.
// Oleh karena itu, metode Handle perlu aman dalam thread!
type CommandHandler interface {
// ...

Marshaler Perintah dan Peristiwa

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler memarsel perintah dan peristiwa ke dalam pesan Watermill, dan sebaliknya.
// Payload dari perintah perlu diparsel menjadi []byte.
type CommandEventMarshaler interface {
	// Marshal memarsel perintah atau peristiwa ke dalam pesan Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal mendekode pesan Watermill menjadi perintah atau peristiwa v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name mengembalikan nama perintah atau peristiwa.
	// Nama dapat digunakan untuk menentukan apakah perintah atau peristiwa yang diterima adalah yang ingin kita proses.
	Name(v interface{}) string

	// NameFromMessage mengembalikan nama perintah atau peristiwa dari pesan Watermill (dihasilkan oleh Marshal).
	//
	// Ketika kita memiliki perintah atau peristiwa yang diparsel menjadi pesan Watermill, kita sebaiknya menggunakan NameFromMessage daripada Name untuk menghindari dekode yang tidak perlu.
	NameFromMessage(msg *message.Message) string
}
// ...

Penggunaan

Contoh Domain

Menggunakan domain sederhana yang bertanggung jawab untuk penanganan pemesanan kamar di sebuah hotel.

Kami akan menggunakan simbol Event Storming untuk menampilkan model domain ini.

Legenda simbol:

  • Sticky note biru adalah perintah
  • Sticky note oranye adalah peristiwa
  • Sticky note hijau adalah model baca yang dihasilkan secara asinkron dari peristiwa
  • Sticky note ungu adalah kebijakan yang dipicu oleh peristiwa dan menghasilkan perintah
  • Sticky note pink adalah hotspot; kami menandai area yang sering mengalami masalah

CQRS Event Storming

Domain ini sederhana:

  • Pelanggan dapat memesan kamar.
  • Setiap kali kamar dipesan, kita memesan botol bir untuk pelanggan (karena kami menyukai tamu kami).
    • Kami tahu bahwa kadang birnya habis.
  • Kami menghasilkan laporan keuangan berdasarkan pemesanan.

Mengirim Perintah

Pertama, kita perlu mensimulasikan tindakan pelanggan.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		pesanPesanKamar := &PesanKamar{
			IdKamar:    fmt.Sprintf("%d", i),
			NamaTamu: "John",
			TanggalMulai: tanggalMulai,
			TanggalSelesai: tanggalSelesai,
		}
		if err := commandBus.Send(context.Background(), pesanPesanKamar); err != nil {
			panic(err)
		}
// ...

Penanganan Perintah

BookRoomHandler akan menangani perintah-perintah kita.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler adalah penangan perintah yang memproses perintah BookRoom dan mengeluarkan acara RoomBooked.
//
// Pada CQRS, sebuah perintah harus diproses oleh sebuah penangan.
// Ketika menambahkan penangan lain untuk menangani perintah ini, akan dikembalikan sebuah kesalahan.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand mengembalikan tipe perintah yang seharusnya diproses oleh penangan ini. Perintah harus berupa pointer.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c selalu merupakan tipe yang dikembalikan oleh `NewCommand`, jadi asersi tipe selalu aman
	cmd := c.(*BookRoom)

	// Beberapa penentuan harga secara acak, yang mungkin dihitung dengan cara yang lebih masuk akal dalam produksi sebenarnya
	harga := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"Telah dipesan %s, dari %s hingga %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked akan ditangani oleh penangan acara OrderBeerOnRoomBooked,
	// dan di masa depan, RoomBooked dapat ditangani oleh beberapa penangan acara
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         harga,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked adalah penangan acara yang memproses acara RoomBooked dan mengeluarkan perintah OrderBeer.
// ...

Penangan Acara

Seperti yang disebutkan sebelumnya, kita ingin memesan botol bir setiap kali sebuah kamar dipesan (ditandai dengan "Ketika kamar dipesan"). Kita mencapai ini dengan menggunakan perintah OrderBeer.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked adalah penangan acara yang memproses acara RoomBooked dan mengeluarkan perintah OrderBeer.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// Nama ini akan disampaikan ke EventsSubscriberConstructor untuk menghasilkan nama antrian
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler sangat mirip dengan `BookRoomHandler`. Satu-satunya perbedaan adalah kadang-kadang mengembalikan kesalahan ketika bir tidak cukup, sehingga perintah diulang. Anda dapat menemukan implementasi lengkapnya di [kode sumber contoh](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc).

Kelompok Penanganan Acara

Secara default, setiap penangan acara memiliki instance langganan yang terpisah. Pendekatan ini berfungsi dengan baik jika hanya satu jenis acara yang dikirimkan ke topik.

Pada kasus beberapa jenis acara pada topik, ada dua opsi:

  1. Anda dapat mengatur EventConfig.AckOnUnknownEvent menjadi true - ini akan mengakui semua acara yang tidak ditangani oleh penangan.
  2. Anda dapat menggunakan mekanisme kelompok penangan acara.

Untuk menggunakan kelompok acara, Anda perlu mengatur opsi GenerateHandlerGroupSubscribeTopic dan GroupSubscriberConstructor pada EventConfig.

Kemudian, Anda dapat menggunakan AddHandlersGroup pada EventProcessor.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},
		NewBookingsFinancialReport(),
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

Baik GenerateHandlerGroupSubscribeTopic maupun GroupSubscriberConstructor menerima informasi tentang nama kelompok sebagai parameter fungsi.

Penangan Generik

Mulai dari Watermill v1.3, penangan generik dapat digunakan untuk menangani perintah dan acara. Ini sangat berguna ketika Anda memiliki jumlah perintah/acara yang besar dan tidak ingin membuat penangan untuk masing-masingnya.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

Di balik layar, ini membuat implementasi EventHandler atau CommandHandler. Ini cocok untuk semua jenis penangan.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler membuat implementasi CommandHandler baru berdasarkan fungsi yang disediakan dan tipe perintah yang disimpulkan dari parameter fungsi.
func NewCommandHandler[Command any](
// ...

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler membuat implementasi EventHandler baru berdasarkan fungsi yang disediakan dan tipe acara yang disimpulkan dari parameter fungsi.
func NewEventHandler[T any](
// ...

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler membuat implementasi GroupEventHandler baru berdasarkan fungsi yang disediakan dan tipe acara yang disimpulkan dari parameter fungsi.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Membangun Model Baca Menggunakan Penangan Acara

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport adalah model baca yang menghitung berapa banyak uang yang dapat kita hasilkan dari pemesanan.
// Ini mendengarkan acara RoomBooked ketika terjadi.
//
// Implementasi ini hanya menulis ke memori. Di lingkungan produksi, Anda mungkin menggunakan beberapa bentuk penyimpanan persisten.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// Nama ini dikirimkan ke EventsSubscriberConstructor dan digunakan untuk menghasilkan nama antrian
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// Handle mungkin dipanggil secara konkuren, jadi perlindungan thread diperlukan.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// Saat menggunakan Pub/Sub yang tidak memberikan semantik pengiriman tepat sekali, kita perlu menghilangkan pesan ganda.
	// GoChannel Pub/Sub memberikan pengiriman tepat sekali,
	// tetapi mari siapkan contoh ini untuk implementasi Pub/Sub lainnya.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> Kamar dipesan seharga $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Menghubungkan Semua Hal

Kita sudah memiliki semua komponen yang diperlukan untuk membangun aplikasi CQRS.

Kami akan menggunakan AMQP (RabbitMQ) sebagai pialang pesan: AMQP.

Di balik layar, CQRS menggunakan router pesan Watermill. Jika Anda tidak familiar dengan ini dan ingin memahami bagaimana ini bekerja, Anda harus melihat panduan memulai. Ini juga akan menunjukkan kepada Anda bagaimana menggunakan beberapa pola pengiriman pesan standar seperti metrik, antrian pesan beracun, pembatasan laju, korelasi, dan alat lain yang digunakan oleh setiap aplikasi berbasis pesan. Alat-alat ini sudah terintegrasi ke dalam Watermill.

Mari kembali ke CQRS. Seperti yang Anda ketahui, CQRS terdiri dari beberapa komponen seperti bus perintah atau peristiwa, prosesor, dan sebagainya.

Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

Itu dia. Kami memiliki aplikasi CQRS yang dapat dijalankan.