CQRS-Mechanism
CQRS steht für "Command Query Responsibility Segregation". Es trennt die Verantwortung von Befehlen (Schreibanfragen) und Abfragen (Leseanfragen). Schreibanfragen und Leseanfragen werden von unterschiedlichen Objekten behandelt.
Das ist CQRS. Wir können die Datenspeicherung weiter trennen und separate Lese- und Schreibspeicher haben. Sobald dies erledigt ist, können viele Lese-Speicher optimiert werden, um verschiedene Arten von Abfragen oder viele abgegrenzte Kontexte abzudecken. Obwohl separate Lese-/Schreibspeicher oft das Thema von Diskussionen im Zusammenhang mit CQRS sind, ist es selbst nicht CQRS. CQRS ist lediglich die erste Trennung von Befehl und Abfrage.
Die cqrs
-Komponente bietet einige nützliche Abstraktionen, die auf Pub/Sub und Router aufbauen, um die CQRS-Muster zu implementieren.
Sie müssen nicht das gesamte CQRS implementieren. In der Regel wird nur der Ereignisteil der Komponente verwendet, um ereignisgesteuerte Anwendungen zu erstellen.
Bausteine
Ereignisse
Ereignisse repräsentieren etwas, das bereits geschehen ist. Ereignisse sind unveränderlich.
Ereignisbus
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus transportiert Ereignisse an Ereignis-Handler.
type EventBus struct {
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic wird verwendet, um den Themenamen für das Veröffentlichen von Ereignissen zu generieren.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish wird vor dem Senden des Ereignisses aufgerufen. Es kann *message.Message modifizieren.
//
// Diese Option ist nicht zwingend erforderlich.
OnPublish OnEventSendFn
// Marshaler wird zum Codieren und Decodieren von Ereignissen verwendet.
// Dies ist obligatorisch.
Marshaler CommandEventMarshaler
// Logger-Instanz zum Protokollieren. Wenn nicht bereitgestellt, wird watermill.NopLogger verwendet.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Ereignisverarbeiter
Vollständiger Code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor wird verwendet, um festzustellen, welcher EventHandler die Ereignisse verarbeiten soll, die vom Ereignisbus empfangen werden.
type EventProcessor struct {
// ...
Vollständiger Code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic wird verwendet, um das Thema für das Abonnieren von Ereignissen zu generieren.
// Wenn der Ereignisprozessor Handlergruppen verwendet, wird GenerateSubscribeTopic verwendet.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor wird verwendet, um einen Abonnenten für den EventHandler zu erstellen.
//
// Diese Funktion wird für jede EventHandler-Instanz einmal aufgerufen.
// Wenn Sie einen Abonnenten für mehrere Handler wiederverwenden möchten, verwenden Sie GroupEventProcessor.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle wird vor der Ereignisverarbeitung aufgerufen.
// OnHandle funktioniert ähnlich wie Middleware: Sie können zusätzliche Logik vor und nach der Ereignisverarbeitung einfügen.
//
// Daher müssen Sie explizit params.Handler.Handle() aufrufen, um das Ereignis zu verarbeiten.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Logik vor der Verarbeitung
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logik nach der Verarbeitung
// // (...)
// return err
// }
//
// Diese Option ist nicht obligatorisch.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent wird verwendet, um zu bestimmen, ob die Nachricht bestätigt werden soll, wenn für das Ereignis kein definierter Handler vorhanden ist.
AckOnUnknownEvent bool
// Marshaler wird verwendet, um Ereignisse zu marshalen und unmarshalen.
// Erforderlich.
Marshaler CommandEventMarshaler
// Logger-Instanz zum Protokollieren.
// Falls nicht bereitgestellt, wird watermill.NopLogger verwendet.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers dient der Aufrechterhaltung der Abwärtskompatibilität.
// Dieser Wert wird bei der Erstellung des EventProcessors mithilfe von NewEventProcessor festgelegt.
// Veraltet: wechseln Sie zu NewEventProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
#### Ereignisgruppenprozessor
Vollständiger Quellcode: [github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go](https://github.com/ThreeDotsLabs/watermill/tree/master/components/cqrs/event_processor_group.go#L104)
```go
// ...
// EventGroupProcessor bestimmt, welcher Ereignisprozessor die Ereignisse verarbeiten soll, die vom Ereignisbus empfangen werden.
// Im Vergleich zu EventProcessor ermöglicht EventGroupProcessor mehreren Prozessoren, dieselbe Abonnenteninstanz zu teilen.
type EventGroupProcessor struct {
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic wird verwendet, um das Thema für das Abonnieren von Gruppenereignisprozessoren zu generieren.
// Diese Option ist für EventProcessor erforderlich, wenn Prozessorgruppen verwendet werden.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor wird verwendet, um einen Abonnenten für GroupEventHandler zu erstellen.
// Diese Funktion wird einmal pro Ereignisgruppe aufgerufen, was es ermöglicht, für jede Gruppe eine Abonnement zu erstellen.
// Das ist sehr nützlich, wenn Ereignisse aus einem Stream in bestimmter Reihenfolge verarbeiten möchten.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle wird vor der Handhabung des Ereignisses aufgerufen.
// OnHandle ist ähnlich wie Middleware: Sie können zusätzliche Logik vor und nach der Handhabung des Ereignisses einfügen.
//
// Daher müssen Sie explizit params.Handler.Handle() aufrufen, um das Ereignis zu handhaben.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // Logik vor der Handhabung
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logik nach der Handhabung
// // (...)
//
// return err
// }
//
// Diese Option ist nicht erforderlich.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent wird verwendet, um zu bestimmen, ob bei einem Ereignis ohne definierten Handler bestätigt werden soll.
AckOnUnknownEvent bool
// Marshaler wird für die Codierung und Dekodierung von Ereignissen verwendet.
// Das ist erforderlich.
Marshaler CommandEventMarshaler
// Logger-Instanz wird für das Protokollieren verwendet.
// Wenn nicht bereitgestellt, wird watermill.NopLogger verwendet.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Weitere Informationen zum Ereignisgruppenprozessor.
Ereignishandler
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler empfängt Ereignisse, die durch NewEvent definiert sind, und verarbeitet sie mit seiner Handle-Methode.
// Bei Verwendung von DDD kann der Ereignishandler Aggregate ändern und persistieren.
// Es kann auch Prozess-Manager, Sagas oder einfach Lese-Modelle erstellen.
//
// Im Gegensatz zu Befehls-Handlern kann jedes Ereignis mehrere Ereignis-Handler haben.
//
// Verwenden Sie während der Nachrichtenverarbeitung eine Ereignis-Handler-Instanz.
// Wenn mehrere Ereignisse gleichzeitig übergeben werden, kann die Handle-Methode mehrmals gleichzeitig ausgeführt werden.
// Daher muss die Handle-Methode threadsicher sein!
type EventHandler interface {
// ...
Befehl
Ein Befehl ist eine einfache Datenstruktur, die eine Anfrage zur Durchführung einer bestimmten Operation darstellt.
Befehlsbus
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus ist das Komponente, die Befehle zu Befehlsverarbeitern transportiert.
type CommandBus struct {
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic wird verwendet, um das Thema für die Veröffentlichung von Befehlen zu generieren.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend wird vor der Veröffentlichung eines Befehls aufgerufen.
// *message.Message kann geändert werden.
//
// Diese Option ist nicht obligatorisch.
OnSend CommandBusOnSendFn
// Marshaler wird für das Serialisieren und Deserialisieren von Befehlen verwendet.
// Erforderlich.
Marshaler CommandEventMarshaler
// Logger-Instanz für Protokollierung.
// Wenn nicht bereitgestellt, wird watermill.NopLogger verwendet.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Befehlsprozessor
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn wird verwendet, um einen Abonnenten für den Befehlsverarbeiter zu erstellen.
// Es ermöglicht Ihnen, für jeden Befehlsverarbeiter einen separaten benutzerdefinierten Abonnenten zu erstellen.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic wird verwendet, um das Thema für das Abonnieren von Befehlen zu generieren.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor wird verwendet, um einen Abonnenten für den Befehlsverarbeiter zu erstellen.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle wird vor dem Behandeln des Befehls aufgerufen.
// OnHandle funktioniert wie ein Middleware: Sie können zusätzliche Logik vor und nach der Behandlung des Befehls einfügen.
//
// Aus diesem Grund müssen Sie params.Handler.Handle() explizit aufrufen, um den Befehl zu behandeln.
// func(params CommandProcessorOnHandleParams) (err error) {
// // Logik vor der Behandlung
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // Logik nach der Behandlung
// // (...)
//
// return err
// }
//
// Diese Option ist nicht erforderlich.
OnHandle CommandProcessorOnHandleFn
// Marshaler wird für die Serialisierung und Deserialisierung von Befehlen verwendet.
// Erforderlich.
Marshaler CommandEventMarshaler
// Logger-Instanz für die Protokollierung.
// Wenn nicht bereitgestellt, wird watermill.NopLogger verwendet.
Logger watermill.LoggerAdapter
// Wenn true, wird der Befehlsverarbeiter Nachrichten bestätigen, auch wenn der Befehlsverarbeiter einen Fehler zurückgibt.
// Wenn RequestReplyBackend nicht null ist und das Senden der Antwort fehlschlägt, wird die Nachricht dennoch abgelehnt.
//
// Warnung: Es wird nicht empfohlen, diese Option zu verwenden, wenn das Request-Reply-Modul (requestreply.NewCommandHandler oder requestreply.NewCommandHandlerWithResult) verwendet wird,
// da es den Befehl bestätigen kann, wenn das Senden der Antwort fehlschlägt.
//
// Bei Verwendung von Request-Reply sollte requestreply.PubSubBackendConfig.AckCommandErrors verwendet werden.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers wird für die Abwärtskompatibilität verwendet.
// Es wird beim Erstellen eines Befehlsverarbeiters mit NewCommandProcessor festgelegt.
// Veraltet: Bitte migrieren Sie zu NewCommandProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Befehlsverarbeiter
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler empfängt den Befehl, der durch NewCommand definiert ist, und verarbeitet ihn mit der Handle-Methode.
// Wenn DDD verwendet wird, kann CommandHandler Aggregate modifizieren und dauerhaft speichern.
//
// Im Gegensatz zu EventHandler kann ein Befehl nur einen CommandHandler haben.
//
// Während der Nachrichtenverarbeitung wird eine Instanz von CommandHandler verwendet.
// Wenn mehrere Befehle gleichzeitig übermittelt werden, kann die Handle-Methode mehrmals parallel ausgeführt werden.
// Die Handle-Methode muss daher threadsicher sein!
type CommandHandler interface {
// ...
Befehls- und Ereignis-Marshaler
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler wandelt Befehle und Ereignisse in Watermill-Nachrichten und umgekehrt um.
// Das Nutzlast des Befehls muss in []bytes umgewandelt werden.
type CommandEventMarshaler interface {
// Marshal wandelt den Befehl oder das Ereignis in eine Watermill-Nachricht um.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal entschlüsselt die Watermill-Nachricht in den v Befehl oder das Ereignis.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name gibt den Namen des Befehls oder Ereignisses zurück.
// Der Name kann verwendet werden, um festzustellen, ob der empfangene Befehl oder das Ereignis das ist, das wir verarbeiten möchten.
Name(v interface{}) string
// NameFromMessage gibt den Namen des Befehls oder Ereignisses aus der Watermill-Nachricht zurück (die von Marshal generiert wurde).
//
// Wenn wir Befehle oder Ereignisse in Watermill-Nachrichten umgewandelt haben, sollten wir NameFromMessage anstelle von Name verwenden, um unnötige Entschlüsselung zu vermeiden.
NameFromMessage(msg *message.Message) string
}
// ...
Nutzung
Beispiel-Domäne
Verwendung einer einfachen Domäne, die für die Verwaltung von Zimmerreservierungen in einem Hotel verantwortlich ist.
Wir verwenden Event Storming-Symbole, um das Modell dieser Domäne zu präsentieren.
Symbollegende:
- Blaue Haftnotizen sind Befehle
- Orange Haftnotizen sind Ereignisse
- Grüne Haftnotizen sind asynchron generierte Lese-Modelle aus Ereignissen
- Lila Haftnotizen sind durch Ereignisse ausgelöste Richtlinien, die Befehle generieren
- Rosa Haftnotizen sind Hotspots; wir markieren Bereiche, die oft auf Probleme stoßen
Die Domäne ist einfach:
- Kunden können Zimmer buchen.
-
Immer wenn ein Zimmer gebucht wird, bestellen wir eine Flasche Bier für den Kunden (weil wir unsere Gäste lieben).
- Wir wissen, dass manchmal das Bier ausgeht.
- Wir erstellen einen Finanzbericht basierend auf der Buchung.
Senden von Befehlen
Zunächst müssen wir Kundenaktionen simulieren.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Befehlshandler
Der BookRoomHandler
wird unsere Befehle verarbeiten.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler ist ein Befehlshandler, der den Befehl BookRoom verarbeitet und das Ereignis RoomBooked auslöst.
//
// Im CQRS muss ein Befehl von einem Handler verarbeitet werden.
// Beim Hinzufügen eines weiteren Handlers, um diesen Befehl zu verarbeiten, wird ein Fehler zurückgegeben.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand gibt den Typ des Befehls zurück, den dieser Handler verarbeiten soll. Es muss ein Zeiger sein.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c ist immer der Typ, der von `NewCommand` zurückgegeben wird, daher ist die Typassertion immer sicher
cmd := c.(*BookRoom)
// Einige zufällige Preise, die in der tatsächlichen Produktion auf vernünftigere Weise berechnet werden können
preis := (rand.Int63n(40) + 1) * 10
log.Printf(
"Zimmer gebucht %s, von %s bis %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked wird vom Ereignis-Handler OrderBeerOnRoomBooked verarbeitet,
// und in Zukunft kann RoomBooked von mehreren Ereignis-Handlern verarbeitet werden
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Preis: preis,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked ist ein Ereignis-Handler, der das Ereignis RoomBooked verarbeitet und den Befehl OrderBeer auslöst.
// ...
Ereignis-Handler
Wie zuvor erwähnt, wollen wir jedes Mal, wenn ein Zimmer gebucht wird (gekennzeichnet mit "Wenn das Zimmer gebucht wird"), eine Flasche Bier bestellen. Dies erreichen wir, indem wir den Befehl OrderBeer
verwenden.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked ist ein Ereignis-Handler, der das Ereignis RoomBooked verarbeitet und den Befehl OrderBeer auslöst.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// Dieser Name wird an den EventsSubscriberConstructor übergeben, um den Warteschlangennamen zu generieren
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler ist ein Befehlshandler, der den Befehl OrderBeer verarbeitet und das Ereignis BeerOrdered auslöst.
// ...
OrderBeerHandler
ist sehr ähnlich zu BookRoomHandler
. Der einzige Unterschied besteht darin, dass er manchmal einen Fehler zurückgibt, wenn kein ausreichendes Bier vorhanden ist, was dazu führt, dass der Befehl erneut ausgegeben wird. Die vollständige Implementierung finden Sie im Beispielquellcode.
Ereignisbehandlungsgruppen
Standardmäßig verfügt jeder Ereignisbehandler über eine separate Abonnenteninstanz. Dieser Ansatz funktioniert gut, wenn nur ein Ereignistyp an das Thema gesendet wird.
Im Fall mehrerer Ereignistypen zum Thema gibt es zwei Optionen:
- Sie können
EventConfig.AckOnUnknownEvent
auftrue
setzen - dies bestätigt alle Ereignisse, die nicht von den Handlern verarbeitet werden. - Sie können den Mechanismus der Ereignisbehandlergruppe verwenden.
Um Ereignisgruppen zu verwenden, müssen Sie die Optionen GenerateHandlerGroupSubscribeTopic
und GroupSubscriberConstructor
in EventConfig
festlegen.
Dann können Sie AddHandlersGroup
auf dem EventProcessor
verwenden.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"ereignisse",
OrderBeerOnRoomBooked{commandBus},
NeuerBuchungsFinanzbericht(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bier bestellt", watermill.LogFields{
"zimmer_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
Sowohl GenerateHandlerGroupSubscribeTopic
als auch GroupSubscriberConstructor
erhalten Informationen über den Gruppennamen als Funktionsparameter.
Generische Handler
Ab Watermill v1.3 können generische Handler verwendet werden, um Befehle und Ereignisse zu verarbeiten. Dies ist sehr nützlich, wenn Sie eine große Anzahl von Befehlen/Ereignissen haben und nicht für jeden einen Handler erstellen möchten.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bier bestellt", watermill.LogFields{
"zimmer_id": event.RoomId,
})
return nil
}),
// ...
Im Hintergrund erstellt es eine EventHandler- oder CommandHandler-Implementierung. Es ist für alle Arten von Handlern geeignet.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler erstellt eine neue CommandHandler-Implementierung basierend auf der bereitgestellten Funktion und dem abgeleiteten Befehlstyp aus den Funktionsparametern.
func NewCommandHandler[Befehl any](
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler erstellt eine neue EventHandler-Implementierung basierend auf der bereitgestellten Funktion und dem abgeleiteten Ereignistyp aus den Funktionsparametern.
func NewEventHandler[T any](
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler erstellt eine neue GroupEventHandler-Implementierung basierend auf der bereitgestellten Funktion und dem abgeleiteten Ereignistyp aus den Funktionsparametern.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Aufbau eines Lesemodells mithilfe von Ereignisbehandlern
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport ist ein Lesemodell, das berechnet, wie viel Geld wir aus Buchungen verdienen können.
// Es hört auf Raumgebuchte Ereignisse, wenn sie auftreten.
//
// Diese Implementierung schreibt einfach in den Speicher. In einer Produktionsumgebung könnten Sie eine Form von persistenter Speicherung verwenden.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// Dieser Name wird an den EventsSubscriberConstructor übergeben und zur Generierung des Warteschlangennamens verwendet
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle kann gleichzeitig aufgerufen werden, daher ist Thread-Sicherheit erforderlich.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// Wenn ein Pub/Sub verwendet wird, das keine genau-einmalige Auslieferungsgarantie bietet, müssen wir Nachrichten deduplizieren.
// GoChannel Pub/Sub bietet genau-einmalige Auslieferung,
// aber lassen Sie uns dieses Beispiel auf andere Pub/Sub-Implementierungen vorbereiten.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Zimmer für $%d gebucht\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Verbindung aller Komponenten
Wir verfügen bereits über alle erforderlichen Komponenten, um eine CQRS-Anwendung zu erstellen.
Wir werden AMQP (RabbitMQ) als unseren Message-Broker verwenden.
Im Hintergrund verwendet CQRS den Message-Router von Watermill. Wenn Sie damit nicht vertraut sind und verstehen möchten, wie er funktioniert, sollten Sie sich das Einführungshandbuch ansehen. Dort erfahren Sie auch, wie Sie einige standardmäßige Messaging-Muster wie Metriken, Poison-Message-Queues, Rate-Limiting, Korrelation und andere Tools verwenden können, die von jeder nachrichtengesteuerten Anwendung verwendet werden. Diese Tools sind bereits in Watermill integriert.
Kehren wir zu CQRS zurück. Wie Sie bereits wissen, besteht CQRS aus mehreren Komponenten wie Befehls- oder Event-Bussen, Prozessoren usw.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
Das ist es. Wir haben eine ausführbare CQRS-Anwendung.