트랜잭션 내에서 메시지 발행의 중요성
이벤트 주도 애플리케이션을 다룰 때는 애플리케이션 상태를 영속화하고 시스템의 다른 부분에게 방금 발생한 일에 대해 알리기 위해 메시지를 발행해야 할 때가 있습니다. 이상적인 시나리오에서는 애플리케이션 상태를 영속화하고 단일 트랜잭션 내에서 메시지를 발행하고 싶습니다. 이렇게 하지 않으면 데이터 일관성 문제가 발생할 수 있기 때문입니다. 데이터 저장 및 이벤트 발행을 동시에 처리하기 위해서는 데이터 저장에 사용된 동일한 데이터베이스에 메시지를 발행하거나 이차 커밋(2PC) 메커니즘을 구현해야 합니다. 만약 메시지 브로커를 데이터베이스로 변경하거나 뻥튀기를 원치 않는다면, Watermill의 전달자 구성 요소를 사용하여 작업을 간소화할 수 있습니다!
전달자 구성 요소
전달자는 데이터베이스에 발행되는 메시지를 기다리는 백그라운드 데몬으로 생각할 수 있으며, 이를 통해 메시지가 최종적으로 메시지 브로커에 도착하도록 보장합니다.
전달자를 일반적이고 투명하게 사용하기 위해, 중간 데이터베이스의 전용 주제에서 수신하고, 캡슐화된 메시지를 보내기 위해 장식된 전달자 발행자를 사용합니다. 전달자는 이를 해제하고 메시지를 지정된 대상 주제로 메시지 브로커에 보냅니다.
예시
다음 예시를 살펴보겠습니다: 복권 추첨을 수행하는 책임이 있는 명령이 있다고 가정해봅시다. 시스템에서 등록된 사용자를 무작위로 선택하여 우승자로 지정해야 합니다. 이 과정에서 고유한 복권 ID와 선택된 사용자 ID를 연결하는 데이터베이스 항목을 저장함으로써 결정을 영속화해야 합니다. 또한, 이벤트 주도 시스템으로써 복권완료
이벤트를 발행하여 다른 구성 요소가 적절하게 반응할 수 있도록 해야 합니다. 구체적으로는 복권 우승자에게 상금을 보내는 책임이 있는 구성 요소가 있을 것입니다. 이 구성 요소는 복권완료
이벤트를 받고 데이터베이스 항목을 사용하여 우승자를 결정합니다.
우리의 시나리오에서 데이터베이스는 MySQL이고 메시지 브로커는 Google Pub/Sub이지만, 다른 두 가지 기술일 수도 있습니다.
이러한 명령을 구현할 때, 다양한 접근 방식을 취할 수 있습니다. 다음에서는 세 가지 가능한 시도를 소개하고 그들의 단점을 지적하겠습니다.
이벤트 발행 후 데이터 저장
이 접근 방식에서 명령은 먼저 이벤트를 발행한 다음 데이터를 저장합니다. 이 방법은 대부분의 경우에 올바르게 작동할 수 있지만, 잠재적인 문제를 식별해 보겠습니다.
명령은 세 가지 기본 작업을 실행해야 합니다:
- 무작위 사용자
A
를 선택하여 당첨자로 선정합니다. -
LotteryConcluded
이벤트를 발행하여B
로또가 종료되었음을 알립니다. - 데이터베이스에
B
로또가 사용자A
에 의해 당첨되었다고 저장합니다.
각 단계는 실패 가능성이 있어 우리의 명령 흐름을 방해할 수 있습니다. 첫 번째 단계가 실패하면 심각한 결과는 나타나지 않을 것입니다. 우리는 그저 오류를 반환하고 전체 명령을 실패로 간주할 뿐입니다. 데이터가 저장되지 않고 이벤트가 발행되지 않을 것이며, 명령을 다시 실행할 수 있습니다.
두 번째 단계가 실패하면 아직 이벤트를 발행하지 않았거나 데이터를 데이터베이스에 저장하지 않았습니다. 우리는 명령을 다시 실행하고 다시 시도할 수 있습니다.
가장 흥미로운 시나리오는 세 번째 단계가 실패하는 경우입니다. 두 번째 단계 이후에 이미 이벤트를 발행했지만 최종적으로 데이터가 데이터베이스에 저장되지 않을 것입니다. 다른 구성 요소들은 로또가 종료되었다는 신호를 받겠지만, 이벤트에서 전송된 로또 ID에 당첨자가 연결되지 않을 것입니다. 그들은 당첨자를 확인할 수 없으므로 그들의 작업도 실패로 간주해야 합니다.
그러한 상황에서 또한 탈출할 수 있지만, 이미 전송된 이벤트에서 로또 ID를 사용하여 명령을 다시 실행하는 등 수동 작업이 필요할 수 있습니다.
// ...
// 1. 먼저 Google Cloud Pub/Sub에 이벤트를 발행한 다음 MySQL에 데이터를 저장합니다.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// 만약 오류가 발생하면, 이벤트는 전송되었지만 데이터는 아직 저장되지 않았습니다.
if err = simulateError(); err != nil {
logger.Error("데이터를 저장하는 데 실패했습니다", err, nil)
return err
}
_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
// ...
데이터 저장 후 이벤트 발행
두 번째 접근 방식에서는 먼저 주소 방식의 단점을 해결하려고 합니다. 데이터베이스에 상태가 정확하게 유지되지 않고 이벤트가 발행되지 않을 때 외부 구성 요소로 실패 상황이 노출되는 것을 방지하기 위해 다음과 같이 조치 순서를 변경할 것입니다.
- 무작위로 사용자
A
를 복권B
의 당첨자로 선정합니다. - 복권
B
가 사용자A
에 의해 획득되었음을 데이터베이스에 유지합니다. - 복권
B
가 종료되었음을 알리는복권종료
이벤트를 발행합니다.
첫 번째 방법과 마찬가지로 첫 두 단계가 실패하면 우리에게는 어떤 결과도 없습니다. 세 번째 단계가 실패하는 경우 데이터는 데이터베이스에 유지되지만 이벤트가 발행되지 않을 것입니다. 이 경우 복권 외부의 구성 요소로 실패 상황이 노출되지는 않겠지만, 예상되는 시스템 동작을 고려할 때 당첨자는 책임 있는 구성 요소에 이 작업을 수행하기 위해 이벤트를 전달받지 못하게 될 것입니다.
이 문제는 수동 작업으로도 해결할 수 있으니, 즉, 이벤트를 수동으로 발행하는 것입니다. 하지만 더 나은 방법이 있습니다.
// ...
// 2. MySQL에 데이터를 먼저 유지한 후 Google Cloud Pub/Sub로 이벤트를 직접 발행합니다.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: projectID,
},
logger,
)
if err != nil {
return err
}
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
// 이 부분이 실패하면 데이터는 이미 유지되었지만 이벤트가 발행되지 않습니다.
if err = simulateError(); err != nil {
logger.Error("이벤트를 발행하지 못했습니다", err, nil)
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
// ...
트랜잭션 내 데이터 저장 및 이벤트 발행
우리의 명령이 동시에 두 번째와 세 번째 작업을 수행할 수 있다고 상상해보십시오. 이들은 원자적으로 커밋되며, 한 작업이 실패하면 다른 작업은 성공할 수 없습니다. 대부분의 데이터베이스에 이미 구현된 트랜잭션 메커니즘을 활용하여 이를 달성할 수 있습니다. 우리 예제에서 사용하는 MySQL도 이러한 데이터베이스 중 하나입니다.
트랜잭션 내에서 데이터를 저장하고 이벤트를 발행하기 위해서는 MySQL에 메시지를 발행할 수 있어야 합니다. 전체 시스템에 걸쳐 MySQL이 지원하는 메시지 브로커로 변경하고 싶지는 않기 때문에, 이를 달성하기 위한 다른 방법을 찾아야 합니다.
좋은 소식은: Watermill은 모든 필요한 도구를 제공합니다! MySQL, PostgreSQL(또는 다른 모든 SQL 데이터베이스), Firestore 또는 Bolt와 같은 데이터베이스를 사용하고 있다면, 해당 데이터베이스로 메시지를 발행할 수 있습니다. Forwarder 구성 요소는 데이터베이스에 발행하는 모든 메시지를 선택하고 이를 메시지 브로커로 전달하는 데 도움이 될 것입니다.
다음을 보장해야 합니다:
- 귀하의 명령이 데이터베이스 트랜잭션 컨텍스트에서 작동하는 퍼블리셔를 사용합니다(예: SQL, Firestore, Bolt).
- Forwarder 구성 요소가 데이터베이스 서브스크라이버와 메시지 브로커 퍼블리셔를 사용하여 실행 중인지 확인합니다.
이러한 경우에는 명령이 다음과 같이 보일 수 있습니다:
Forwarder 구성 요소를 백그라운드에서 작동하도록 설정하고 MySQL에서 Google Pub/Sub으로 메시지를 전달하려면 다음과 같이 설정해야 합니다:
전체 코드는 다음을 참조하십시오: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go
// ...
// 데이터를 MySQL에 영구 저장하고 트랜잭션 내에서 Google Cloud Pub/Sub로 이벤트를 발행합니다.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
tx.Commit()
} else {
logger.Info("에러로 인해 트랜잭션을 롤백합니다", watermill.LogFields{"error": err.Error()})
// 에러가 발생한 경우, MySQL 트랜잭션 롤백으로 인해 다음과 같은 원치 않는 시나리오가 발생하지 않도록 합니다:
// - 이벤트가 발행되었지만 데이터가 영구 저장되지 않은 경우,
// - 데이터가 영구 저장되었지만 이벤트가 발행되지 않은 경우.
tx.Rollback()
}
}()
_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
if err != nil {
return err
}
var publisher message.Publisher
publisher, err = sql.NewPublisher(
tx,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
return err
}
// Forwarder 구성 요소가 이해할 수 있는 래퍼로 퍼블리셔를 장식합니다.
publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
ForwarderTopic: forwarderSQLTopic,
})
// 복권 종료 이벤트를 발행합니다. 여기서 주의해야 할 점은 이제 장식된 MySQL 퍼블리셔를 사용하여 Google Cloud 토픽에 발행된다는 것입니다.
event := LotteryConcludedEvent{LotteryID: lotteryID}
payload, err := json.Marshal(event)
if err != nil {
return err
}
err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
if err != nil {
return err
}
return nil
// ...
이 예제에 대해 더 알고 싶다면, [여기](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/transactional-events-forwarder/main.go)에서 구현을 찾아보세요.