Mécanisme CQRS
CQRS signifie "Command Query Responsibility Segregation". Il sépare la responsabilité de la commande (requêtes d'écriture) et de la requête (requêtes de lecture). Les requêtes d'écriture et de lecture sont gérées par des objets différents.
C'est cela le CQRS. Nous pouvons également séparer le stockage des données, avec des espaces de stockage distincts pour la lecture et l'écriture. Une fois ceci fait, il peut y avoir de nombreux espaces de stockage pour la lecture optimisés pour gérer différents types de requêtes ou couvrant de nombreux contextes délimités. Bien que le stockage de lecture/écriture séparé soit souvent le sujet de discussions liées au CQRS, ce n'est pas le CQRS lui-même. Le CQRS n'est que la première séparation de la commande et de la requête.
Le composant cqrs
fournit certaines abstractions utiles, construites au-dessus de Pub/Sub et Router, pour aider à implémenter le modèle CQRS.
Vous n'avez pas besoin d'implémenter l'intégralité du CQRS. En général, seule la partie événementielle du composant est utilisée pour construire des applications pilotées par les événements.
Blocs de construction
Événements
Les événements représentent quelque chose qui s'est déjà produit. Les événements sont immuables.
Bus d'événements
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus transporte les événements vers les gestionnaires d'événements.
type EventBus struct {
// ...
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic est utilisé pour générer le nom du sujet pour la publication d'événements.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish est appelé avant l'envoi de l'événement. Il peut modifier *message.Message.
//
// Cette option n'est pas obligatoire.
OnPublish OnEventSendFn
// Marshaler est utilisé pour l'encodage et le décodage des événements.
// Ceci est obligatoire.
Marshaler CommandEventMarshaler
// Instance Logger pour le journalisation. Si non fourni, watermill.NopLogger est utilisé.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processeur d'événements
Code complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor est utilisé pour déterminer quel EventHandler doit gérer les événements reçus du bus d'événements.
type EventProcessor struct {
// ...
Code complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic est utilisé pour générer le sujet pour s'abonner aux événements.
// Si le processeur d'événements utilise des groupes de gestionnaires, alors GenerateSubscribeTopic est utilisé.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor est utilisé pour créer un abonné pour l'EventHandler.
//
// Cette fonction est appelée une fois pour chaque instance de EventHandler.
// Si vous voulez réutiliser un abonné pour plusieurs gestionnaires, utilisez GroupEventProcessor.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle est appelé avant de gérer l'événement.
// OnHandle fonctionne de manière similaire à un middleware : vous pouvez injecter une logique supplémentaire avant et après le traitement de l'événement.
//
// Par conséquent, vous devez appeler explicitement params.Handler.Handle() pour gérer l'événement.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Logique avant le traitement
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logique après le traitement
// // (...)
// return err
// }
//
// Cette option n'est pas obligatoire.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent est utilisé pour déterminer si le message doit être accusé de réception lorsque l'événement n'a pas de gestionnaire défini.
AckOnUnknownEvent bool
// Marshaler est utilisé pour sérialiser et désérialiser les événements.
// Requis.
Marshaler CommandEventMarshaler
// Instance de journalisation pour les journaux.
// Si non fourni, watermill.NopLogger sera utilisé.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers est pour maintenir la compatibilité ascendante.
// Cette valeur sera définie lors de la création de EventProcessor en utilisant NewEventProcessor.
// Déconseillé : migrer vers NewEventProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processeur de Groupe d'Événements
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor détermine quel processeur d'événements doit gérer les événements reçus du bus d'événements.
// Comparé à EventProcessor, EventGroupProcessor permet à plusieurs processeurs de partager la même instance d'abonné.
type EventGroupProcessor struct {
// ...
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic est utilisé pour générer le sujet pour s'abonner aux processeurs d'événements de groupe.
// Cette option est requise pour EventProcessor lors de l'utilisation de groupes de processeurs.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor est utilisé pour créer un abonné pour GroupEventHandler.
// Cette fonction est appelée une fois par groupe d'événements - permettant à un abonnement d'être créé pour chaque groupe.
// C'est très utile lorsque nous voulons gérer les événements d'un flux dans l'ordre.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle est appelé avant de gérer l'événement.
// OnHandle est similaire à un middleware : vous pouvez injecter une logique supplémentaire avant et après la gestion de l'événement.
//
// Par conséquent, vous devez appeler explicitement params.Handler.Handle() pour gérer l'événement.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // Logique avant la gestion
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logique après la gestion
// // (...)
//
// return err
// }
//
// Cette option n'est pas obligatoire.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent est utilisé pour déterminer s'il faut accuser réception si l'événement n'a pas de gestionnaire défini.
AckOnUnknownEvent bool
// Marshaler est utilisé pour l'encodage et le décodage des événements.
// Ceci est requis.
Marshaler CommandEventMarshaler
// Instance de journal utilisée pour le journalisation.
// Si non fourni, watermill.NopLogger sera utilisé.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
En savoir plus sur le Processeur de Groupe d'Événements.
Gestionnaire d'Événements
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler reçoit les événements définis par NewEvent et les gère en utilisant sa méthode Handle.
// En utilisant DDD, le gestionnaire d'événements peut modifier et persister des agrégats.
// Il peut également invoquer des gestionnaires de processus, des sagas, ou simplement construire des modèles de lecture.
//
// Contrairement aux gestionnaires de commandes, chaque événement peut avoir plusieurs gestionnaires d'événements.
//
// Pendant la gestion des messages, utilisez une seule instance de EventHandler.
// Lors du passage de plusieurs événements simultanément, la méthode Handle peut être exécutée plusieurs fois en parallèle.
// Par conséquent, la méthode Handle doit être thread-safe !
type EventHandler interface {
// ...
Commande
Une commande est une structure de données simple représentant une demande d'effectuer une opération.
Bus de commandes
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus est le composant qui transporte les commandes vers les gestionnaires de commandes.
type CommandBus struct {
// ...
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic est utilisé pour générer le sujet pour publier des commandes.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend est appelé avant de publier une commande.
// *message.Message peut être modifié.
//
// Cette option n'est pas obligatoire.
OnSend CommandBusOnSendFn
// Marshaler est utilisé pour la sérialisation et la désérialisation des commandes.
// Requis.
Marshaler CommandEventMarshaler
// Logger instance utilisée pour le journalisation.
// Si non fourni, watermill.NopLogger sera utilisé.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processeur de commandes
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn est utilisé pour créer un abonné pour CommandHandler.
// Il vous permet de créer un abonné personnalisé séparé pour chaque gestionnaire de commandes.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic est utilisé pour générer le sujet pour s'abonner aux commandes.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor est utilisé pour créer un abonné pour CommandHandler.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle est appelé avant de traiter la commande.
// OnHandle fonctionne comme un intergiciel : vous pouvez injecter une logique supplémentaire avant et après le traitement de la commande.
//
// Pour cette raison, vous devez appeler explicitement params.Handler.Handle() pour traiter la commande.
// func(params CommandProcessorOnHandleParams) (err error) {
// // logique avant le traitement
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // logique après le traitement
// // (...)
//
// return err
// }
//
// Cette option n'est pas obligatoire.
OnHandle CommandProcessorOnHandleFn
// Marshaler est utilisé pour la sérialisation et la désérialisation des commandes.
// Requis.
Marshaler CommandEventMarshaler
// Logger instance pour la journalisation.
// Si non fourni, watermill.NopLogger sera utilisé.
Logger watermill.LoggerAdapter
// Si true, CommandProcessor confirmera les messages même si CommandHandler retourne une erreur.
// Si RequestReplyBackend n'est pas nul et l'envoi de la réponse échoue, le message sera quand même rejeté.
//
// Attention : Il n'est pas recommandé d'utiliser cette option lors de l'utilisation du composant requestreply (requestreply.NewCommandHandler ou requestreply.NewCommandHandlerWithResult),
// car cela pourrait confirmer la commande lorsque l'envoi de la réponse échoue.
//
// Lors de l'utilisation de requestreply, vous devriez utiliser requestreply.PubSubBackendConfig.AckCommandErrors.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers est utilisé pour la compatibilité ascendante.
// Il est défini lors de la création d'un CommandProcessor avec NewCommandProcessor.
// Obsolète : veuillez migrer vers NewCommandProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processeur de commandes
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// Le CommandHandler reçoit la commande définie par NewCommand et la traite en utilisant la méthode Handle.
// Si l'usage du DDD (Domain Driven Design) est effectué, CommandHandler peut modifier et persister les agrégats.
//
// Contrairement à EventHandler, chaque commande ne peut avoir qu'un seul CommandHandler.
//
// Pendant le traitement du message, utilisez une seule instance de CommandHandler.
// Lorsque plusieurs commandes sont délivrées simultanément, la méthode Handle peut être exécutée plusieurs fois en parallèle.
// Par conséquent, la méthode Handle doit être protégée contre les problèmes de concurrence !
type CommandHandler interface {
// ...
Marshaler de commandes et événements
Code source complet : github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler marshalise les commandes et les événements en messages Watermill, et vice versa.
// La charge utile de la commande doit être marshalisée en []bytes.
type CommandEventMarshaler interface {
// Marshalise la commande ou l'événement en un message Watermill.
Marshal(v interface{}) (*message.Message, error)
// Décodage du message Watermill en la commande ou l'événement v.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Nom retourne le nom de la commande ou de l'événement.
// Le nom peut être utilisé pour déterminer si la commande ou l'événement reçu est celui que nous voulons traiter.
Name(v interface{}) string
// NameFromMessage retourne le nom de la commande ou de l'événement à partir du message Watermill (généré par Marshal).
//
// Lorsque nous avons des commandes ou des événements marshalisés en messages Watermill, nous devrions utiliser NameFromMessage au lieu de Name pour éviter un décodage inutile.
NameFromMessage(msg *message.Message) string
}
// ...
Utilisation
Exemple de domaine
Utilisation d'un domaine simple chargé de gérer les réservations de chambre dans un hôtel.
Nous utiliserons des symboles Event Storming pour présenter le modèle de ce domaine.
Légende des symboles :
- Les post-it bleus sont des commandes
- Les post-it orange sont des événements
- Les post-it verts sont des modèles de lecture générés de façon asynchrone à partir des événements
- Les post-it violet sont des politiques déclenchées par des événements et générant des commandes
- Les post-it roses sont des points chauds ; nous marquons les zones qui rencontrent souvent des problèmes
Le domaine est simple :
- Les clients peuvent réserver des chambres.
-
Chaque fois qu'une chambre est réservée, nous commandons une bouteille de bière pour le client (parce que nous aimons nos invités).
- Nous savons que parfois la bière vient à manquer.
- Nous générons un rapport financier basé sur la réservation.
Envoi de commandes
Tout d'abord, nous devons simuler les actions des clients.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Gestionnaire de commandes
Le BookRoomHandler
gérera nos commandes.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler est un gestionnaire de commandes qui traite la commande BookRoom et émet l'événement RoomBooked.
//
// En CQRS, une commande doit être traitée par un gestionnaire.
// Lors de l'ajout d'un autre gestionnaire pour traiter cette commande, une erreur sera retournée.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand retourne le type de commande que ce gestionnaire doit traiter. Il doit s'agir d'un pointeur.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c est toujours le type retourné par `NewCommand`, donc l'assertion de type est toujours sécurisée
cmd := c.(*BookRoom)
// Un prix aléatoire, qui peut être calculé de manière plus judicieuse en production réelle
prix := (rand.Int63n(40) + 1) * 10
log.Printf(
"Chambre réservée %s, de %s à %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked sera traité par le gestionnaire d'événements OrderBeerOnRoomBooked,
// et à l'avenir, RoomBooked pourra être géré par plusieurs gestionnaires d'événements
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Prix: prix,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked est un gestionnaire d'événements qui traite l'événement RoomBooked et émet la commande OrderBeer.
// ...
Gestionnaires d'événements
Comme mentionné précédemment, nous voulons commander une bouteille de bière chaque fois qu'une chambre est réservée (indiqué par "Quand la chambre est réservée"). Nous réalisons cela en utilisant la commande OrderBeer
.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked est un gestionnaire d'événements qui traite l'événement RoomBooked et émet la commande OrderBeer.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// Ce nom est transmis à EventsSubscriberConstructor pour générer le nom de la file d'attente
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler est un gestionnaire de commandes qui traite la commande OrderBeer et émet l'événement BeerOrdered.
// ...
OrderBeerHandler
est très similaire à BookRoomHandler
. La seule différence est qu'il retourne parfois une erreur lorsqu'il n'y a pas assez de bière, ce qui provoque la réémission de la commande. Vous pouvez trouver l'implémentation complète dans le code source d'exemple.
Groupes de gestionnaires d'événements
Par défaut, chaque gestionnaire d'événements a une instance d'abonné distincte. Cette approche fonctionne bien si seulement un type d'événement est envoyé au sujet.
Dans le cas de plusieurs types d'événements sur le sujet, il existe deux options :
- Vous pouvez définir
EventConfig.AckOnUnknownEvent
surtrue
- cela acknowle toutes les événements non gérés par les gestionnaires. - Vous pouvez utiliser le mécanisme de groupe de gestionnaires d'événements.
Pour utiliser des groupes d'événements, vous devez définir les options GenerateHandlerGroupSubscribeTopic
et GroupSubscriberConstructor
dans EventConfig
.
Ensuite, vous pouvez utiliser AddHandlersGroup
sur l'EventProcessor
.
Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bières commandées", watermill.LogFields{
"id_chambre": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
GenerateHandlerGroupSubscribeTopic
et GroupSubscriberConstructor
reçoivent tous deux des informations sur le nom du groupe en tant que paramètres de la fonction.
Gestionnaires génériques
À partir de Watermill v1.3, des gestionnaires génériques peuvent être utilisés pour gérer les commandes et les événements. Cela est très utile lorsque vous avez un grand nombre de commandes/événements et que vous ne voulez pas créer un gestionnaire pour chacun.
Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bières commandées", watermill.LogFields{
"id_chambre": event.RoomId,
})
return nil
}),
// ...
En coulisse, cela crée une implémentation de EventHandler ou CommandHandler. Il convient à tous les types de gestionnaires.
Code source complet: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler crée une nouvelle implémentation de CommandHandler basée sur la fonction fournie et le type de commande déduit à partir des paramètres de la fonction.
func NewCommandHandler[Command any](
// ...
Code source complet: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler crée une nouvelle implémentation de EventHandler basée sur la fonction fournie et le type d'événement déduit à partir des paramètres de la fonction.
func NewEventHandler[T any](
// ...
Code source complet: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler crée une nouvelle implémentation de GroupEventHandler basée sur la fonction fournie et le type d'événement déduit à partir des paramètres de la fonction.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Construction d'un modèle de lecture à l'aide de gestionnaires d'événements
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport est un modèle de lecture qui calcule combien d'argent nous pouvons gagner avec les réservations.
// Il écoute les événements RoomBooked lorsqu'ils se produisent.
//
// Cette implémentation écrit simplement en mémoire. Dans un environnement de production, vous pourriez utiliser une forme de stockage persistant.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// Ce nom est transmis à EventsSubscriberConstructor et utilisé pour générer le nom de la file d'attente
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle peut être appelé de manière concourante, donc la sécurité des threads est nécessaire.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// Lors de l'utilisation de l'envoi/réception de messages qui ne fournit pas de sémantique de livraison exactement une fois, nous devons supprimer les messages en double.
// GoChannel Pub/Sub fournit une livraison exactement une fois,
// mais préparons cet exemple pour d'autres implémentations de l'envoi/réception de messages.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Chambre réservée pour $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Connecter tout
Nous disposons déjà de tous les composants nécessaires pour construire une application CQRS.
Nous utiliserons AMQP (RabbitMQ) comme notre courtier de messages : AMQP.
Sous le capot, CQRS utilise le routeur de messages de Watermill. Si vous n'êtes pas familier avec cela et que vous souhaitez comprendre comment cela fonctionne, vous devriez consulter le guide de démarrage. Il vous montrera également comment utiliser certaines configurations standard telles que les métriques, les files de messages indésirables, la limitation de débit, la corrélation, et d'autres outils utilisés par chaque application pilotée par les messages. Ces outils sont déjà intégrés dans Watermill.
Revenons à CQRS. Comme vous le savez déjà, CQRS se compose de plusieurs composants tels que des bus de commandes ou d'événements, des processeurs, et ainsi de suite.
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
C'est tout. Nous avons une application CQRS exécutable.