カスタムパブリッシャー/サブスクライバーインターフェース
カスタムパブリッシャー/サブスクライバーをサポートするためには、message.Publisher
およびmessage.Subscriber
インターフェースを実装する必要があります。
完全なソースコード: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publishは指定されたトピックにメッセージを公開します。
//
// Publisherの実装に応じて、Publishは同期的または非同期的に実行される場合があります。
//
// ほとんどのパブリッシャーの実装は、メッセージの原子的な公開をサポートしていません。
// つまり、1つのメッセージの公開に失敗した場合、次のメッセージは公開されません。
//
// Publishはスレッドセーフである必要があります。
Publish(topic string, messages ...*Message) error
// パブリッシャーが非同期の場合、Closeは未送信のメッセージをフラッシュする必要があります。
Close() error
}
// サブスクライバーはパブリッシャー/サブスクライバーの消費部分です。
type Subscriber interface {
// Subscribeは、指定されたトピックから受信したメッセージの出力チャネルを返します。
// Close()がサブスクライバーで呼ばれるとチャネルが閉じられます。
//
// 次のメッセージを受信するには、受信したメッセージにAck()を呼び出す必要があります。
// メッセージの処理に失敗し、メッセージを再配信する必要がある場合は、Nack()を呼び出す必要があります。
//
// 提供されたctxがキャンセルされると、サブスクライバーは購読を閉じ、出力チャネルも閉じます。
// 提供されたctxは、生成されたすべてのメッセージに設定されます。
// メッセージでNackまたはAckが呼び出されると、メッセージのコンテキストがキャンセルされます。
Subscribe(ctx context.Context, topic string) (
タスクリスト
以下は忘れてはいけないいくつかのポイントです:
- ロギング(適切なレベルとメッセージ)。
- 可読性が高く、設定可能なメッセージエンコーダーの置換。
- パブリッシャーやサブスクライバーの
Close()
の実装は以下であるべきです:- 冪等性を持つこと
- パブリッシャーやサブスクライバーがブロックされている場合に正常に動作すること(例:Ackを待っている)
- 出力チャネルがブロックされている場合に正常に動作すること(誰もリスニングしていないため)
- 受信したメッセージの
Ack()
とNack()
のサポート。 -
Nack()
を使用して購読されたメッセージの再配信のサポート。 - 汎用的なパブリッシャー/サブスクライバーのテストを使用する。デバッグのヒントについては、テストのトラブルシューティングガイドを参照してください。
- パフォーマンスの最適化。
- GoDocs、Markdownドキュメント、そして導入的な例。