パブリッシャー

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
	// Publishは指定されたトピックにメッセージを公開します。
	//
	// Publishは同期的または非同期的に行われる場合があります。実装に依存します。
	//
	// ほとんどのパブリッシャーの実装は、メッセージのアトミックなパブリッシングをサポートしません。
	// これはつまり、メッセージの公開に失敗した場合、次のメッセージは公開されません。
	//
	// Publishはスレッドセーフである必要があります。
	Publish(topic string, messages ...*Message) error
	// パブリッシャーが非同期的な場合、Closeは未送信のメッセージをフラッシュします。
	Close() error
}
// ...

複数のメッセージを公開する

ほとんどのパブリッシャーの実装は、メッセージのアトミックなパブリッシングをサポートしません。これはつまり、メッセージの公開に失敗した場合、次のメッセージは公開されません。

非同期的な公開

公開は同期的または非同期的に行われる場合があります。実装に依存します。

Close()

パブリッシャーが非同期的な場合、Close()は未送信のメッセージをフラッシュする必要があります。購読者を閉じるのを忘れないで。そうしないと、いくつかのメッセージが失われる可能性があります。

サブスクライバー

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Subscriber interface {
	// 提供されたトピックからメッセージを含む出力チャネルを返します。
	// 購読者のClose()が呼び出されると、チャネルが閉じます。
	//
	// 次のメッセージを受信するには、受信したメッセージで`Ack()`を呼び出す必要があります。
	// もしメッセージの処理が失敗し、メッセージを再配信する必要がある場合は、`Nack()`を呼ぶべきです。
	//
	// 提供されたctxがキャンセルされると、購読者は購読を閉じ、出力チャネルを閉じます。
	// 提供されたctxは生成されたすべてのメッセージに設定されます。
	// AckまたはNackがメッセージで呼び出された場合、そのメッセージのコンテキストはキャンセルされます。
	Subscribe(ctx context.Context, topic string) (
}

Ack/Nack メカニズム

サブスクライバーはメッセージからのAckおよびNackの処理を担当しています。適切な実装では、次のメッセージを消費する前にAckまたはNackを待機すべきです。

重要な購読者の実装のポイント: WatermillメッセージのAck後に、Ack/オフセットをメッセージのストレージ/エージェントに送信することが重要です。そうしないと、メッセージの処理の前にプロセスが終了した場合、メッセージが失われる可能性があります。

Close()

Closeはすべての購読とその出力チャネルを閉じ、必要に応じてオフセットをフラッシュします。

少なくとも1回の配信

Watermillは少なくとも1回の配信セマンティクスを使用して構築されています。つまり、メッセージの処理中にエラーが発生し、Ackを送信することができない場合、メッセージは再配信されます。

このことを考慮して、アイデンポテントな処理のためにアプリケーションを構築するか、再試行メカニズムを実装する必要があります。

残念ながら、汎用的な再試行ミドルウェアを作成することはできませんので、独自の実装を構築することをお勧めします。

汎用テスト

すべてのパブサブはほとんど同じです。各パブサブ実装のために別々のテストを記述する必要を避けるために、任意のパブサブがパスする必要があるテストスイートを作成しました。

これらのテストはpubsub/tests/test_pubsub.goで見つけることができます。

組み込み実装

利用可能なパブサブの実装を確認するには、サポートされているパブサブを参照してください。

カスタムパブサブの実装

新しいパブサブのサポートを導入する手順については、「カスタムパブサブの実装」を参照してください。