Mechanizm CQRS

CQRS oznacza “Command Query Responsibility Segregation”. Oddziela odpowiedzialność za polecenie (żądania zapisu) i zapytanie (żądania odczytu). Żądania zapisu i odczytu są obsługiwane przez różne obiekty.

To jest CQRS. Możemy dodatkowo oddzielić przechowywanie danych, mając osobne przechowywanie odczytu i zapisu. Po wykonaniu tego kroku może istnieć wiele przechowujących odczyt zoptymalizowanych do obsługi różnych typów zapytań lub obejmujących wiele ograniczonych kontekstów. Chociaż oddzielne przechowywanie odczytu/zapisu jest często tematem dyskusji związanym z CQRS, to samo CQRS nim nie jest. CQRS to tylko pierwsze oddzielenie polecenia i zapytania.

Diagram architektury CQRS

Składnik cqrs dostarcza kilka przydatnych abstrakcji, zbudowanych na bazie Pub/Sub i Router, które pomagają w implementacji wzorca CQRS.

Nie trzeba implementować całego CQRS. Zazwyczaj tylko część zdarzeń tego komponentu jest wykorzystywana do budowy aplikacji opartych na zdarzeniach.

Elementy składowe

Zdarzenia

Zdarzenia reprezentują coś, co już się wydarzyło. Zdarzenia są niemutowalne.

Magistrala zdarzeń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus transportuje zdarzenia do obsługujących zdarzenia.
type EventBus struct {
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic służy do generowania nazwy tematu do publikowania zdarzeń.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish jest wywoływane przed wysłaniem zdarzenia. Może modyfikować *message.Message.
    //
    // Ta opcja nie jest obowiązkowa.
    OnPublish OnEventSendFn

    // Marshaler jest używany do kodowania i dekodowania zdarzeń.
    // To jest obowiązkowe.
    Marshaler CommandEventMarshaler

    // Instancja loggera do logowania. Jeśli nie jest podana, używany jest watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Procesor zdarzeń

Pełny kod: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor służy do określenia EventHandlera, który ma obsłużyć zdarzenia otrzymane z magistrali zdarzeń.
type EventProcessor struct {
// ...

Pełny kod: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
    // GenerateSubscribeTopic służy do generowania tematu do subskrybowania zdarzeń.
    // Jeśli procesor zdarzeń używa grup obsługujących, to używane jest GenerataSubscribeTopic.
    GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

    // SubscriberConstructor służy do tworzenia subskrybenta dla EventHandlera.
    //
    // Ta funkcja jest wywoływana raz dla każdej instancji EventHandlera.
    // Jeśli chcesz ponownie użyć subskrybenta dla wielu obsługujących, użyj GroupEventProcessor.
    SubscriberConstructor EventProcessorSubscriberConstructorFn

    // OnHandle jest wywoływane przed obsługą zdarzenia.
    // OnHandle działa podobnie jak oprogramowanie pośredniczące: możesz wstrzyknąć dodatkową logikę przed i po obsłudze zdarzenia.
    //
    // Dlatego musisz jawnie wywołać params.Handler.Handle(), aby obsłużyć zdarzenie.
    //
    //  func(params EventProcessorOnHandleParams) (err error) {
    //      // Logika przed obsługą
    //      //  (...)

    //      err := params.Handler.Handle(params.Message.Context(), params.Event)
    //
    //      // Logika po obsłudze
    //      //  (...)

    //      return err
    //  }
    //
    // Ta opcja nie jest obowiązkowa.
    OnHandle EventProcessorOnHandleFn

    // AckOnUnknownEvent służy do określenia, czy wiadomość powinna być potwierdzona, gdy zdarzenie nie ma zdefiniowanego obsługującego.
    AckOnUnknownEvent bool

    // Marshaler służy do marshalingu i unmarshalingu zdarzeń.
    // Wymagane.
    Marshaler CommandEventMarshaler

    // Instancja rejestrowania dziennika.
    // Jeśli nie zostanie podana, zostanie użyty watermill.NopLogger.
    Logger watermill.LoggerAdapter

    // disableRouterAutoAddHandlers służy do zachowania kompatybilności wstecznej.
    // Ta wartość zostanie ustawiona podczas tworzenia EventProcessor za pomocą NewEventProcessor.
    // Wycofane: przejdź do NewEventProcessorWithConfig.
    disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Procesor grupy zdarzeń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor określa, który procesor zdarzeń powinien obsługiwać zdarzenia otrzymane z magistrali zdarzeń.
// W porównaniu do EventProcessor, EventGroupProcessor pozwala wielu procesorom dzielić tę samą instancję subskrybenta.
type EventGroupProcessor struct {
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
    // GenerateSubscribeTopic służy do generowania tematu subskrypcji dla grup procesorów zdarzeń.
    // Ta opcja jest wymagana dla EventProcessor przy użyciu grup procesorów.
    GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

    // SubscriberConstructor służy do tworzenia subskrybenta dla GroupEventHandler.
    // Ta funkcja jest wywoływana raz na grupę zdarzeń - umożliwiając utworzenie subskrypcji dla każdej grupy.
    // Jest bardzo przydatna, gdy chcemy obsługiwać zdarzenia ze strumienia w określonej kolejności.
    SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

    // OnHandle jest wywoływane przed obsługą zdarzenia.
    // OnHandle jest podobne do oprogramowania pośredniczącego: możesz wstrzyknąć dodatkową logikę przed i po obsłudze zdarzenia.
    //
    // Dlatego też musisz wywołać jawnie params.Handler.Handle(), aby obsłużyć zdarzenie.
    //
    // func(params EventGroupProcessorOnHandleParams) (err error) {
    //     // Logika przed obsługą
    //     //  (...)
    //
    //     err := params.Handler.Handle(params.Message.Context(), params.Event)
    //
    //     // Logika po obsłudze
    //     //  (...)
    //
    //     return err
    // }
    //
    // Ta opcja nie jest wymagana.
    OnHandle EventGroupProcessorOnHandleFn

    // AckOnUnknownEvent służy do określenia, czy należy potwierdzić, jeśli zdarzenie nie ma zdefiniowanego obsługującego.
    AckOnUnknownEvent bool

    // Marshaler jest używany do kodowania i dekodowania zdarzeń.
    // To jest wymagane.
    Marshaler CommandEventMarshaler

    // Instancja rejestrowania używana do logowania.
    // Jeśli nie zostanie podana, użyta zostanie watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Dowiedz się więcej o Procesorze Grupy Zdarzeń.

Obsługa Zdarzeń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler odbiera zdarzenia zdefiniowane przez NewEvent i obsługuje je za pomocą swojej metody Handle.
// Jeśli używany jest DDD, obsługa zdarzeń może modyfikować i utrwalać agregaty.
// Może również wywoływać menedżery procesów, sagi lub po prostu budować modele odczytu.
//
// W przeciwieństwie do obsługujących poleceń, każde zdarzenie może mieć wiele obsługujących zdarzeń.
//
// Podczas obsługi wiadomości, użyj jednej instancji EventHandler.
// Przekazywanie wielu zdarzeń jednocześnie może skutkować wielokrotnym równoczesnym wykonaniem metody Handle.
// Dlatego też metoda Handle musi być bezpieczna dla wątków!
type EventHandler interface {
// ...

Polecenie

Polecenie to prosta struktura danych reprezentująca żądanie wykonania pewnej operacji.

Magazyn poleceń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus to komponent transportujący polecenia do obsługujących poleceń.
type CommandBus struct {
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
    // GeneratePublishTopic służy do generowania tematu dla publikowania poleceń.
    GeneratePublishTopic CommandBusGeneratePublishTopicFn

    // OnSend jest wywoływane przed opublikowaniem polecenia.
    // *message.Message można modyfikować.
    //
    // Ta opcja nie jest obowiązkowa.
    OnSend CommandBusOnSendFn

    // Marshaler służy do serializacji i deserializacji poleceń.
    // Wymagane.
    Marshaler CommandEventMarshaler

    // Instancja Logger używana do logowania.
    // Jeśli nie zostanie podana, zostanie użyty watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Procesor poleceń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn służy do tworzenia subskrybenta dla obsługującego poleceń.
// Umożliwia tworzenie osobnego niestandardowego subskrybenta dla każdego obsługującego polecenie.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
    // GenerateSubscribeTopic służy do generowania tematu dla subskrybowania poleceń.
    GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

    // SubscriberConstructor jest używany do tworzenia subskrybenta dla obsługującego poleceń.
    SubscriberConstructor CommandProcessorSubscriberConstructorFn

    // OnHandle jest wywoływane przed obsłużeniem polecenia.
    // OnHandle działa jak pośrednik: można wstrzyknąć dodatkową logikę przed i po obsłużeniu polecenia.
    //
    // Z tego powodu, musisz wywołać params.Handler.Handle() jawnie, aby obsłużyć polecenie.
    //  func(params CommandProcessorOnHandleParams) (err error) {
    //      // logika przed obsłużeniem
    //      // (...)
    //
    //      err := params.Handler.Handle(params.Message.Context(), params.Command)
    //
    //      // logika po obsłużeniu
    //      // (...)
    //
    //      return err
    //  }
    //
    // Ta opcja nie jest wymagana.
    OnHandle CommandProcessorOnHandleFn

    // Marshaler jest używany do serializacji i deserializacji poleceń.
    // Wymagane.
    Marshaler CommandEventMarshaler

    // Instancja Logger do logowania.
    // Jeśli nie zostanie podana, zostanie użyty watermill.NopLogger.
    Logger watermill.LoggerAdapter

    // Jeśli true, CommandProcessor będzie potwierdzać wiadomości nawet jeśli obsługujący polecenia zwraca błąd.
    // Jeśli RequestReplyBackend nie jest pusty i wysłanie odpowiedzi się nie powiedzie, wiadomość zostanie nadal odrzucona.
    //
    // Ostrzeżenie: Nie zaleca się używania tej opcji podczas korzystania z komponentu requestreply (requestreply.NewCommandHandler lub requestreply.NewCommandHandlerWithResult),
    // ponieważ może potwierdzać polecenie, gdy wysyłanie odpowiedzi się nie powiedzie.
    //
    // Korzystając z requestreply, należy użyć requestreply.PubSubBackendConfig.AckCommandErrors.
    AckCommandHandlingErrors bool

    // disableRouterAutoAddHandlers jest używane dla zachowania kompatybilności wstecznej.
    // Jest ustawiane podczas tworzenia CommandProcessor z NewCommandProcessor.
    // Przestarzałe: proszę przenieść się do NewCommandProcessorWithConfig.
    disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Procesor poleceń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler odbiera polecenie zdefiniowane przez NewCommand i obsługuje je za pomocą metody Handle.
// Jeśli korzystamy z DDD, CommandHandler może modyfikować i zapisywać zbiory.
//
// W przeciwieństwie do EventHandler, każde polecenie może mieć tylko jeden CommandHandler.
//
// Podczas obsługi wiadomości, użyj jednej instancji CommandHandler.
// W przypadku dostarczenia wielu poleceń jednocześnie, metoda Handle może być wykonana wielokrotnie równolegle.
// Dlatego metoda Handle musi być bezpieczna wątkowo!
type CommandHandler interface {
// ...

Konwerter poleceń i zdarzeń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler zamienia polecenia i zdarzenia na wiadomości Watermill, i odwrotnie.
// Ładunek polecenia musi być zamieniony na []bytes.
type CommandEventMarshaler interface {
    // Marshal zamienia polecenie lub zdarzenie na wiadomość Watermill.
    Marshal(v interface{}) (*message.Message, error)

    // Unmarshal dekoduje wiadomość Watermill na polecenie lub zdarzenie v.
    Unmarshal(msg *message.Message, v interface{}) (err error)

    // Name zwraca nazwę polecenia lub zdarzenia.
    // Nazwa może być używana do określenia czy otrzymane polecenie lub zdarzenie jest tym, które chcemy przetworzyć.
    Name(v interface{}) string

    // NameFromMessage zwraca nazwę polecenia lub zdarzenia z wiadomości Watermill (wygenerowanej przez Marshal).
    //
    // Gdy mamy polecenia lub zdarzenia zamienione na wiadomości Watermill, powinniśmy używać NameFromMessage zamiast Name, aby uniknąć niepotrzebnego dekodowania.
    NameFromMessage(msg *message.Message) string
}
// ...

Użycie

Przykładowa dziedzina

Wykorzystując prostą dziedzinę, która jest odpowiedzialna za obsługę rezerwacji pokoi w hotelu.

Będziemy korzystać z symboli Event Storming, aby zaprezentować model tej dziedziny.

Legenda symboli:

  • Niebieskie karteczki to polecenia
  • Pomarańczowe karteczki to zdarzenia
  • Zielone karteczki to asynchronicznie generowane modele odczytu na podstawie zdarzeń
  • Fioletowe karteczki to strategie wywoływane przez zdarzenia i generujące polecenia
  • Różowe karteczki to punkty gorące; oznaczamy obszary, które często napotykają problemy

CQRS Event Storming

Dziedzina jest prosta:

  • Klienci mogą rezerwować pokoje.
  • Za każdym razem, gdy dokonana jest rezerwacja pokoju, zamawiamy butelkę piwa dla klienta (ponieważ kochamy naszych gości).
    • Wiemy, że czasami piwo się kończy.
  • Generujemy raport finansowy na podstawie rezerwacji.

Wysyłanie poleceń

Po pierwsze, musimy symulować działania klienta.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
        rezerwujPokojPolecenie := &RezerwujPokoj{
            IdPokoju: fmt.Sprintf("%d", i),
            ImieGoscia: "Jan",
            DataRozpoczecia: dataRozpoczecia,
            DataZakonczenia: dataZakonczenia,
        }
        if err := autobusPolecen.Wyslij(context.Background(), rezerwujPokojPolecenie); err != nil {
            panic(err)
        }
// ...

Obsługa poleceń

BookRoomHandler będzie obsługiwał nasze polecenia.

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler to handler poleceń procesujących polecenie BookRoom i wyemitowania zdarzenia RoomBooked.
//
// W CQRS polecenie musi być przetworzone przez handler.
// Dodanie kolejnego handlera do obsługi tego polecenia spowoduje zwrócenie błędu.
type BookRoomHandler struct {
    eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
    return "BookRoomHandler"
}

// NewCommand zwraca typ polecenia, które ten handler powinien przetwarzać. Musi to być wskaźnik.
func (b BookRoomHandler) NewCommand() interface{} {
    return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
    // c jest zawsze typem zwróconym przez `NewCommand`, więc asercja typu jest zawsze bezpieczna
    cmd := c.(*BookRoom)

    // Jakaś losowa cena, która może być obliczona w bardziej sensowny sposób w rzeczywistej produkcji
    price := (rand.Int63n(40) + 1) * 10

    log.Printf(
        "Zarezerwowano pokój %s, od %s do %s",
        cmd.RoomId,
        cmd.GuestName,
        time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
        time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
    )

    // RoomBooked zostanie obsłużone przez handler zdarzeń OrderBeerOnRoomBooked,
    // a w przyszłości, RoomBooked może być obsługiwane przez wiele handlerów zdarzeń
    if err := b.eventBus.Publish(ctx, &RoomBooked{
        ReservationId: watermill.NewUUID(),
        RoomId:        cmd.RoomId,
        GuestName:     cmd.GuestName,
        Price:         price,
        StartDate:     cmd.StartDate,
        EndDate:       cmd.EndDate,
    }); err != nil {
        return err
    }

    return nil
}

// OrderBeerOnRoomBooked to handler zdarzeń, który przetwarza zdarzenie RoomBooked i wyemituje polecenie OrderBeer.
// ...

Obsługa zdarzeń

Jak wspomniano wcześniej, chcemy zamówić butelkę piwa za każdym razem, gdy pokój zostanie zarezerwowany (oznaczony jako “When the room is booked”). Realizujemy to, używając polecenia OrderBeer.

Pełen kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked to handler zdarzeń, który przetwarza zdarzenie RoomBooked i jest wyemituje polecenie OrderBeer.
type OrderBeerOnRoomBooked struct {
    commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
    // Ta nazwa jest przekazywana do konstruktora EventsSubscriberConstructor w celu wygenerowania nazwy kolejki
    return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
    return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
    event := e.(*RoomBooked)

    orderBeerCmd := &OrderBeer{
        RoomId: event.RoomId,
        Count:  rand.Int63n(10) + 1,
    }

    return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler to handler poleceń, który przetwarza polecenie OrderBeer i wyemituje zdarzenie BeerOrdered.
// ...

OrderBeerHandler jest bardzo podobny do BookRoomHandler. Jedyną różnicą jest to, że czasami zwraca błąd, gdy piwa jest niewystarczająco, powodując ponowne wydanie polecenia. Pełną implementację znajdziesz w przykładowym kodzie źródłowym.

Grupy obsługujące zdarzenia

Domyślnie każda grupa obsługująca zdarzenia ma oddzielny egzemplarz subskrybenta. Ten podejście działa dobrze, jeśli do tematu jest wysyłany tylko jeden rodzaj zdarzenia.

W przypadku wielu rodzajów zdarzeń wysyłanych do tematu są dwie opcje:

  1. Możesz ustawić EventConfig.AckOnUnknownEvent na true - to spowoduje zatwierdzenie wszystkich zdarzeń, które nie są obsługiwane przez obsługujące je programy.
  2. Możesz użyć mechanizmu grup obsługujących zdarzenia.

Aby użyć grup zdarzeń, musisz ustawić opcje GenerateHandlerGroupSubscribeTopic i GroupSubscriberConstructor w EventConfig.

Następnie możesz użyć AddHandlersGroup na EventProcessor.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
    err = eventProcessor.AddHandlersGroup(
        "events",
        OrderBeerOnRoomBooked{commandBus},

        NewBookingsFinancialReport(),

        cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
            logger.Info("Zamówiono piwo", watermill.LogFields{
                "room_id": event.RoomId,
            })
            return nil
        }),
    )
    if err != nil {
// ...

Obie opcje GenerateHandlerGroupSubscribeTopic i GroupSubscriberConstructor otrzymują informacje o nazwie grupy jako parametry funkcji.

Ogólne programy obsługi

Rozpoczynając od wersji Watermill v1.3, ogólne programy obsługi mogą być używane do obsługi poleceń i zdarzeń. Jest to bardzo przydatne, gdy masz dużą liczbę poleceń/zdarzeń i nie chcesz tworzyć osobnego programu obsługi dla każdego z nich.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
        cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
            logger.Info("Zamówiono piwo", watermill.LogFields{
                "room_id": event.RoomId,
            })
            return nil
        }),
// ...

Wewnętrznie tworzy ona implementację EventHandler lub CommandHandler. Nadaje się do wszystkich rodzajów programów obsługi.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler tworzy nową implementację CommandHandler na podstawie dostarczonej funkcji i wywnioskowanego typu polecenia z parametrów funkcji.
func NewCommandHandler[Command any](
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler tworzy nową implementację EventHandler na podstawie dostarczonej funkcji i wywnioskowanego typu zdarzenia z parametrów funkcji.
func NewEventHandler[T any](
// ...

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler tworzy nową implementację GroupEventHandler na podstawie dostarczonej funkcji i wywnioskowanego typu zdarzenia z parametrów funkcji.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Budowa modelu odczytu z wykorzystaniem obsługiwaczy zdarzeń

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport to model odczytu, który oblicza, ile pieniędzy możemy zarobić z rezerwacji.
// Słucha zdarzeń RoomBooked, gdy wystąpią.
//
// Ta implementacja po prostu zapisuje do pamięci. W środowisku produkcyjnym możesz użyć formy trwałego przechowywania.
type BookingsFinancialReport struct {
    handledBookings map[string]struct{}
    totalCharge     int64
    lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
    return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
    // Ta nazwa jest przekazywana do EventsSubscriberConstructor i używana do generowania nazwy kolejki
    return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
    return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
    // Obsługa może być wywoływana równolegle, więc wymagane jest zapewnienie bezpieczeństwa wątków.
    b.lock.Lock()
    defer b.lock.Unlock()

    event := e.(*RoomBooked)

    // Gdy używamy Pub/Sub, który nie zapewnia semantyki dostawy dokładnie jeden raz, musimy usuwać zduplikowane komunikaty.
    // Pub/Sub GoChannel zapewnia dostawę dokładnie jeden raz,
    // ale przygotujmy ten przykład do innych implementacji Pub/Sub.
    if _, ok := b.handledBookings[event.ReservationId]; ok {
        return nil
    }
    b.handledBookings[event.ReservationId] = struct{}{}

    b.totalCharge += event.Price

    fmt.Printf(">>> Pokój zarezerwowany za $%d\n", b.totalCharge)
    return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Połącz Wszystko

Mamy już wszystkie komponenty potrzebne do zbudowania aplikacji CQRS.

Skorzystamy z AMQP (RabbitMQ) jako naszego brokera wiadomości: AMQP.

Wewnętrznie, CQRS wykorzystuje router wiadomości Watermill. Jeśli nie jesteś zaznajomiony z tym zagadnieniem i chcesz zrozumieć, jak to działa, powinieneś zajrzeć do przewodnika dla początkujących. Przewodnik pokaże również, jak korzystać z standardowych wzorców komunikacyjnych, takich jak metryki, kolejkowanie zatrutych wiadomości, ograniczanie szybkości, korelacja i inne narzędzia używane w każdej aplikacji opartej na wiadomościach. Te narzędzia są już wbudowane w Watermill.

Wróćmy do CQRS. Jak już wiesz, CQRS składa się z wielu komponentów, takich jak magistrale poleceń lub zdarzeń, procesory itp.

Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
    logger := watermill.NewStdLogger(false, false)
    cqrsMarshaler := cqrs.ProtobufMarshaler{}

    // You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
    // Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
    // Commands will be send to queue, because they need to be consumed once.
    commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
    commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
    if err != nil {
        panic(err)
    }
    commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
    if err != nil {
        panic(err)
    }

    // Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
    // (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
    eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
    if err != nil {
        panic(err)
    }

    // CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // Simple middleware which will recover panics from event or command handlers.
    // More about router middlewares you can find in the documentation:
    // https://watermill.io/docs/messages-router/#middleware
    //
    // List of available middlewares you can find in message/router/middleware.
    router.AddMiddleware(middleware.Recoverer)

    commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
        GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
            // we are using queue RabbitMQ config, so we need to have topic per command type
            return params.CommandName, nil
        },
        OnSend: func(params cqrs.CommandBusOnSendParams) error {
            logger.Info("Sending command", watermill.LogFields{
                "command_name": params.CommandName,
            })

            params.Message.Metadata.Set("sent_at", time.Now().String())

            return nil
        },
        Marshaler: cqrsMarshaler,
        Logger:    logger,
    })
    if err != nil {
        panic(err)
    }

    commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
        router,
        cqrs.CommandProcessorConfig{
            GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
                // we are using queue RabbitMQ config, so we need to have topic per command type
                return params.CommandName, nil
            },
            SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
                // we can reuse subscriber, because all commands have separated topics
                return commandsSubscriber, nil
            },
            OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
                start := time.Now()

                err := params.Handler.Handle(params.Message.Context(), params.Command)

                logger.Info("Command handled", watermill.LogFields{
                    "command_name": params.CommandName,
                    "duration":     time.Since(start),
                    "err":          err,
                })

                return err
            },
            Marshaler: cqrsMarshaler,
            Logger:    logger,
        },
    )
    if err != nil {
        panic(err)
    }

    eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
        GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
            // because we are using PubSub RabbitMQ config, we can use one topic for all events
            return "events", nil

            // we can also use topic per event type
            // return params.EventName, nil
        },

        OnPublish: func(params cqrs.OnEventSendParams) error {
            logger.Info("Publishing event", watermill.LogFields{
                "event_name": params.EventName,
            })

            params.Message.Metadata.Set("published_at", time.Now().String())

            return nil
        },

        Marshaler: cqrsMarshaler,
        Logger:    logger,
    })
    if err != nil {
        panic(err)
    }

    eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
        router,
        cqrs.EventGroupProcessorConfig{
            GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
                return "events", nil
            },
            SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
                config := amqp.NewDurablePubSubConfig(
                    amqpAddress,
                    amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
                )

                return amqp.NewSubscriber(config, logger)
            },

            OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
                start := time.Now()

                err := params.Handler.Handle(params.Message.Context(), params.Event)

                logger.Info("Event handled", watermill.LogFields{
                    "event_name": params.EventName,
                    "duration":   time.Since(start),
                    "err":        err,
                })

                return err
            },

            Marshaler: cqrsMarshaler,
            Logger:    logger,
        },
    )
    if err != nil {
        panic(err)
    }

    err = commandProcessor.AddHandlers(
        BookRoomHandler{eventBus},
        OrderBeerHandler{eventBus},
    )
    if err != nil {
        panic(err)
    }

    err = eventProcessor.AddHandlersGroup(
        "events",
        OrderBeerOnRoomBooked{commandBus},

        NewBookingsFinancialReport(),

        cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
            logger.Info("Beer ordered", watermill.LogFields{
                "room_id": event.RoomId,
            })
            return nil
        }),
    )
    if err != nil {
        panic(err)
    }

    // publish BookRoom commands every second to simulate incoming traffic
    go publishCommands(commandBus)

    // processors are based on router, so they will work when router will start
    if err := router.Run(context.Background()); err != nil {
        panic(err)
    }
}
// ...

To wszystko. Mamy działającą aplikację CQRS.