Meccanismo CQRS

CQRS sta per "Command Query Responsibility Segregation". Esso separa la responsabilità dei comandi (richieste di scrittura) e delle query (richieste di lettura). Le richieste di scrittura e di lettura sono gestite da oggetti diversi.

Questo è il CQRS. Possiamo ulteriormente separare l'archiviazione dei dati, avendo archiviazioni separate per la lettura e la scrittura. Una volta fatto ciò, potrebbero esserci molte archiviazioni per la lettura, ottimizzate per gestire diversi tipi di query o che si estendono su molti contesti delimitati. Anche se l'archiviazione separata per lettura/scrittura è spesso oggetto di discussione relativo al CQRS, non rappresenta il CQRS stesso. Il CQRS è solo la prima separazione di comando e query.

Diagramma dell'architettura CQRS

Il componente cqrs fornisce alcune astrazioni utili, costruite su Pub/Sub e Router, per aiutare a implementare il modello CQRS.

Non è necessario implementare l'intero CQRS. Di solito, si utilizza solo la parte degli eventi del componente per costruire applicazioni basate sugli eventi.

Elementi costitutivi

Eventi

Gli eventi rappresentano qualcosa che è già accaduto. Gli eventi sono immutabili.

Autobus degli eventi

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus trasporta gli eventi ai gestori degli eventi.
type EventBus struct {
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic viene utilizzato per generare il nome del topic per la pubblicazione degli eventi.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish viene chiamato prima dell'invio dell'evento. Può modificare *message.Message.
    //
    // Questa opzione non è obbligatoria.
    OnPublish OnEventSendFn

    // Marshaler viene utilizzato per codificare e decodificare gli eventi.
    // Questo è obbligatorio.
    Marshaler CommandEventMarshaler

    // Istanza del logger per il logging. Se non fornito, verrà utilizzato watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Processore di eventi

Codice completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// L'EventProcessor viene utilizzato per determinare l'EventHandler che gestirà gli eventi ricevuti dall'autobus degli eventi.
type EventProcessor struct {
// ...

Codice completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic viene utilizzato per generare il topic per iscriversi agli eventi.
	// Se il processore di eventi utilizza gruppi di gestori, viene utilizzato GenerateSubscribeTopic.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor viene utilizzato per creare un sottoscrittore per l'EventHandler.
	//
	// Questa funzione viene chiamata una volta per ogni istanza di EventHandler.
	// Se si desidera riutilizzare un sottoscrittore per più gestori, utilizzare GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle viene chiamato prima di gestire l'evento.
	// OnHandle funziona in modo simile a un middleware: è possibile iniettare logiche aggiuntive prima e dopo il trattamento dell'evento.
	//
	// Pertanto, è necessario chiamare esplicitamente params.Handler.Handle() per gestire l'evento.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // Logica prima del trattamento
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // Logica dopo il trattamento
	//      //  (...)

	//      return err
	//  }
	//
	// Questa opzione non è obbligatoria.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent viene utilizzato per determinare se il messaggio dovrebbe essere riconosciuto quando l'evento non ha un gestore definito.
	AckOnUnknownEvent bool

	// Marshaler viene utilizzato per marshalling e unmarshalling degli eventi.
	// Richiesto.
	Marshaler CommandEventMarshaler

	// Istanza Logger per il logging.
	// Se non fornito, verrà utilizzato watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers è per mantenere la compatibilità con le versioni precedenti.
	// Questo valore verrà impostato durante la creazione di EventProcessor utilizzando NewEventProcessor.
	// Obsoleto: migrare a NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Processore del Gruppo di Eventi

Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor determina quale processore di eventi deve gestire gli eventi ricevuti dal bus degli eventi.
// Rispetto a EventProcessor, EventGroupProcessor consente a più processori di condividere la stessa istanza sottoscritta.
type EventGroupProcessor struct {
// ...

Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic viene utilizzato per generare l'argomento per sottoscrivere i gruppi di processori di eventi.
	// Questa opzione è richiesta per EventProcessor quando si utilizzano gruppi di processori.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor viene utilizzato per creare un sottoscrittore per GroupEventHandler.
	// Questa funzione viene chiamata una volta per ogni gruppo di eventi, permettendo la creazione di una sottoscrizione per ogni gruppo.
	// È molto utile quando si desidera gestire gli eventi da uno stream in ordine.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle viene chiamato prima di gestire l'evento.
	// OnHandle è simile a un middleware: è possibile iniettare logica aggiuntiva prima e dopo il trattamento dell'evento.
	//
	// Pertanto, è necessario chiamare esplicitamente params.Handler.Handle() per gestire l'evento.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // Logica prima del trattamento
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // Logica dopo il trattamento
	//     //  (...)
	//
	//     return err
	// }
	//
	// Questa opzione non è richiesta.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent viene utilizzato per determinare se riconoscere se l'evento non ha un gestore definito.
	AckOnUnknownEvent bool

	// Marshaler è utilizzato per la codifica e decodifica degli eventi.
	// Questo è richiesto.
	Marshaler CommandEventMarshaler

	// Istanza Logger utilizzata per il logging.
	// Se non fornito, verrà utilizzato watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Per saperne di più sulla Gestione del Gruppo di Eventi.

Gestore degli Eventi

Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler riceve eventi definiti da NewEvent e li gestisce utilizzando il suo metodo Handle.
// Se si utilizza DDD, il gestore degli eventi può modificare e persistere aggregate.
// Può anche invocare gestori di processi, saghe o semplicemente costruire modelli di lettura.
//
// A differenza dei gestori dei comandi, ogni evento può avere più gestori di eventi.
//
// Durante la gestione dei messaggi, utilizzare un'unica istanza di EventHandler.
// Quando si passano più eventi contemporaneamente, il metodo Handle può essere eseguito più volte in modo concorrente.
// Pertanto, il metodo Handle deve essere thread-safe!
type EventHandler interface {
// ...

Comando

Un comando è una semplice struttura dati che rappresenta una richiesta per eseguire un'operazione.

Bus dei Comandi

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus è il componente che trasporta i comandi ai gestori dei comandi.
type CommandBus struct {
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic è utilizzato per generare il topic per la pubblicazione dei comandi.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend viene chiamato prima di pubblicare un comando.
	// *message.Message può essere modificato.
	//
	// Questa opzione non è obbligatoria.
	OnSend CommandBusOnSendFn

	// Marshaler viene utilizzato per la serializzazione e deserializzazione dei comandi.
	// Richiesto.
	Marshaler CommandEventMarshaler

	// Istanza Logger utilizzata per il logging.
	// Se non fornito, verrà utilizzato watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Processore dei Comandi

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn viene utilizzato per creare un subscriber per il gestore dei comandi.
// Ti consente di creare un subscriber personalizzato separato per ciascun gestore dei comandi.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic è utilizzato per generare il topic per sottoscriversi ai comandi.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor è utilizzato per creare un subscriber per il gestore dei comandi.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle viene chiamato prima di gestire il comando.
	// OnHandle funziona come un middleware: puoi iniettare logiche aggiuntive prima e dopo il gestire il comando.
	//
	// A causa di questo, è necessario chiamare esplicitamente params.Handler.Handle() per gestire il comando.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // logica prima della gestione
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // logica dopo la gestione
	//      // (...)
	//
	//      return err
	//  }
	//
	// Questa opzione non è richiesta.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler è utilizzato per la serializzazione e deserializzazione dei comandi.
	// Richiesto.
	Marshaler CommandEventMarshaler

	// Istanza Logger per il logging.
	// Se non fornito, verrà utilizzato watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// Se true, CommandProcessor confermerà i messaggi anche se il CommandHandler restituisce un errore.
	// Se RequestReplyBackend non è nullo e l'invio della risposta fallisce, il messaggio verrà comunque rifiutato.
	//
	// Avviso: non è consigliato utilizzare questa opzione quando si utilizza il componente requestreply (requestreply.NewCommandHandler o requestreply.NewCommandHandlerWithResult),
	// in quanto potrebbe confermare il comando quando l'invio della risposta fallisce.
	//
	// Quando si utilizza requestreply, è necessario utilizzare requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers è utilizzato per la compatibilità all'indietro.
	// Viene impostato durante la creazione di un CommandProcessor con NewCommandProcessor.
	// Obsoleto: si prega di migrare a NewCommandProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Elaboratore di comandi

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// Il CommandHandler riceve il comando definito da NewCommand e lo gestisce utilizzando il metodo Handle.
// Se si usa DDD, il CommandHandler può modificare e persistere gli aggregati.
//
// A differenza dell'EventHandler, ogni Command può avere solo un CommandHandler.
//
// Durante la gestione dei messaggi, utilizzare un'istanza di CommandHandler.
// Quando vengono consegnati contemporaneamente più comandi, il metodo Handle può essere eseguito più volte contemporaneamente.
// Pertanto, il metodo Handle deve essere thread-safe!
type CommandHandler interface {
// ...

Marshaler di Comandi ed Eventi

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler marshals i comandi e gli eventi in messaggi Watermill e viceversa.
// Il payload del comando deve essere marshaled in []bytes.
type CommandEventMarshaler interface {
	// Marshal marshals il comando o l'evento in un messaggio Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal decodes il messaggio Watermill nel comando o evento v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name restituisce il nome del comando o dell'evento.
	// Il nome può essere utilizzato per determinare se il comando o l'evento ricevuto è quello che vogliamo processare.
	Name(v interface{}) string

	// NameFromMessage restituisce il nome del comando o dell'evento dal messaggio Watermill (generato da Marshal).
	//
	// Quando abbiamo comandi o eventi marshaled in messaggi Watermill, dovremmo utilizzare NameFromMessage invece di Name per evitare decodifiche inutili.
	NameFromMessage(msg *message.Message) string
}
// ...

Utilizzo

Esempio di Dominio

Utilizzando un dominio semplice responsabile della gestione delle prenotazioni delle camere in un hotel.

Utilizzeremo simboli di Event Storming per mostrare il modello di questo dominio.

Leggenda dei simboli:

  • I post-it blu sono comandi
  • I post-it arancioni sono eventi
  • I post-it verdi sono modelli di lettura generati in modo asincrono da eventi
  • I post-it viola sono politiche attivate dagli eventi e che generano comandi
  • I post-it rosa sono punti critici; segnaliamo aree che incontrano spesso problemi

CQRS Event Storming

Il dominio è semplice:

  • I clienti possono prenotare camere.
  • Ogni volta che viene prenotata una camera, ordiniamo una bottiglia di birra per il cliente (perché amiamo i nostri ospiti).
    • Sappiamo che a volte finisce la birra.
  • Generiamo un report finanziario basato sulla prenotazione.

Invio di Comandi

In primo luogo, dobbiamo simulare le azioni dei clienti.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		prenotaCameraCmd := &PrenotaCamera{
			IdCamera:    fmt.Sprintf("%d", i),
			NomeOspite: "John",
			DataInizio: dataInizio,
			DataFine:   dataFine,
		}
		if err := commandBus.Send(context.Background(), prenotaCameraCmd); err != nil {
			panic(err)
		}
// ...

Gestore dei comandi

Il BookRoomHandler gestirà i nostri comandi.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler è un gestore dei comandi che elabora il comando BookRoom ed emette l'evento RoomBooked.
//
// Nel CQRS, un comando deve essere elaborato da un gestore.
// Aggiungendo un altro gestore per gestire questo comando, verrà restituito un errore.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) NomeGestore() string {
	return "BookRoomHandler"
}

// NewCommand restituisce il tipo di comando che questo gestore dovrebbe elaborare. Deve essere un puntatore.
func (b BookRoomHandler) NuovoComando() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Gestisci(ctx context.Context, c interface{}) error {
// c è sempre il tipo restituito da `NewCommand`, quindi l'asserzione di tipo è sempre sicura
cmd := c.(*BookRoom)

// Un prezzo casuale, che potrebbe essere calcolato in modo più sensato in produzione effettiva
price := (rand.Int63n(40) + 1) * 10

log.Printf(
"Prenotato %s, da %s a %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)

// RoomBooked sarà gestito dal gestore dell'evento OrderBeerOnRoomBooked,
// e in futuro, RoomBooked potrà essere gestito da più gestori degli eventi
if err := b.eventBus.Pubblica(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}

return nil
}

// OrderBeerOnRoomBooked è un gestore degli eventi che elabora l'evento RoomBooked ed emette il comando OrderBeer.
// ...

Gestori degli eventi

Come già accennato, vogliamo ordinare una bottiglia di birra ogni volta che una stanza viene prenotata (etichettata con "Quando la stanza viene prenotata"). Per farlo, usiamo il comando OrderBeer.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked è un gestore degli eventi che elabora l'evento RoomBooked ed emette il comando OrderBeer.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) NomeGestore() string {
// Questo nome viene passato a EventsSubscriberConstructor per generare il nome della coda
return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NuovoEvento() interface{} {
return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Gestisci(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)

orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}

return o.commandBus.Invio(ctx, orderBeerCmd)
}

// OrderBeerHandler è un gestore dei comandi che elabora il comando OrderBeer ed emette l'evento BeerOrdered.
// ...

OrderBeerHandler è molto simile a BookRoomHandler. L'unico cambiamento è che a volte restituisce un errore quando non c'è abbastanza birra, causando il reinoltro del comando. Puoi trovare l'implementazione completa nel codice sorgente dell'esempio.

Gruppi di gestori di eventi

Per impostazione predefinita, ogni gestore di eventi ha un'istanza di sottoscrittore separata. Questo approccio funziona bene se viene inviato solo un tipo di evento all'argomento.

Nel caso di più tipi di eventi sull'argomento, ci sono due opzioni:

  1. È possibile impostare EventConfig.AckOnUnknownEvent su true - questo riconoscerà tutti gli eventi non gestiti dai gestori.
  2. È possibile utilizzare il meccanismo dei gruppi di gestori di eventi.

Per utilizzare i gruppi di eventi, è necessario impostare le opzioni GenerateHandlerGroupSubscribeTopic e GroupSubscriberConstructor in EventConfig.

Quindi, è possibile utilizzare AddHandlersGroup sull'EventProcessor.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Birra ordinata", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

Entrambi GenerateHandlerGroupSubscribeTopic e GroupSubscriberConstructor ricevono informazioni sul nome del gruppo come parametri della funzione.

Gestori generici

A partire da Watermill v1.3, è possibile utilizzare gestori generici per gestire comandi ed eventi. È molto utile quando si ha un gran numero di comandi/eventi e non si vuole creare un gestore per ciascuno.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Birra ordinata", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

In background, crea un'implementazione di EventHandler o CommandHandler. È adatto a tutti i tipi di gestori.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler crea una nuova implementazione di CommandHandler in base alla funzione fornita e al tipo di comando inferito dai parametri della funzione.
func NewCommandHandler[Command any](
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler crea una nuova implementazione di EventHandler in base alla funzione fornita e al tipo di evento inferito dai parametri della funzione.
func NewEventHandler[T any](
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler crea una nuova implementazione di GroupEventHandler in base alla funzione fornita e al tipo di evento inferito dai parametri della funzione.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Costruzione di un modello di lettura utilizzando gli Event Handlers

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport è un modello di lettura che calcola quanto denaro possiamo guadagnare dalle prenotazioni.
// Ascolta gli eventi RoomBooked quando si verificano.
//
// Questa implementazione scrive semplicemente in memoria. In un ambiente di produzione, potresti utilizzare una forma di archiviazione persistente.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// Questo nome viene passato a EventsSubscriberConstructor e utilizzato per generare il nome della coda
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// Handle potrebbe essere chiamato in modo concorrente, quindi è necessaria la sicurezza del thread.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// Quando si utilizza Pub/Sub che non fornisce semantica di consegna esattamente una volta, è necessario deduplicare i messaggi.
	// GoChannel Pub/Sub fornisce la consegna esattamente una volta,
	// ma prepariamo questo esempio per altre implementazioni di Pub/Sub.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> Camera prenotata per $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Collega Tutto

Abbiamo già tutti i componenti necessari per costruire un'applicazione CQRS.

Utilizzeremo AMQP (RabbitMQ) come nostro message broker: AMQP.

Sotto il cofano, CQRS utilizza il message router di Watermill. Se non sei familiare con questo e vuoi capire come funziona, dovresti dare un'occhiata alla guida introduttiva. Ti mostrerà anche come utilizzare alcuni modelli di messaggistica standard come metriche, code dei messaggi tossici, limitazione della velocità, correlazione e altri strumenti utilizzati da ogni applicazione basata su messaggi. Questi strumenti sono già integrati in Watermill.

Torniamo a parlare di CQRS. Come già sai, CQRS è composto da vari componenti come bus dei comandi o degli eventi, processori, ecc.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

È tutto. Abbiamo un'applicazione CQRS eseguibile.