カスタムパブリッシャー/サブスクライバーインターフェース

カスタムパブリッシャー/サブスクライバーをサポートするためには、message.Publisherおよびmessage.Subscriberインターフェースを実装する必要があります。

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publishは指定されたトピックにメッセージを公開します。
    //
    // Publisherの実装に応じて、Publishは同期的または非同期的に実行される場合があります。
    //
    // ほとんどのパブリッシャーの実装は、メッセージの原子的な公開をサポートしていません。
    // つまり、1つのメッセージの公開に失敗した場合、次のメッセージは公開されません。
    //
    // Publishはスレッドセーフである必要があります。
    Publish(topic string, messages ...*Message) error
    // パブリッシャーが非同期の場合、Closeは未送信のメッセージをフラッシュする必要があります。
    Close() error
}

// サブスクライバーはパブリッシャー/サブスクライバーの消費部分です。
type Subscriber interface {
    // Subscribeは、指定されたトピックから受信したメッセージの出力チャネルを返します。
    // Close()がサブスクライバーで呼ばれるとチャネルが閉じられます。
    //
    // 次のメッセージを受信するには、受信したメッセージにAck()を呼び出す必要があります。
    // メッセージの処理に失敗し、メッセージを再配信する必要がある場合は、Nack()を呼び出す必要があります。
    //
    // 提供されたctxがキャンセルされると、サブスクライバーは購読を閉じ、出力チャネルも閉じます。
    // 提供されたctxは、生成されたすべてのメッセージに設定されます。
    // メッセージでNackまたはAckが呼び出されると、メッセージのコンテキストがキャンセルされます。
    Subscribe(ctx context.Context, topic string) (

タスクリスト

以下は忘れてはいけないいくつかのポイントです:

  1. ロギング(適切なレベルとメッセージ)。
  2. 可読性が高く、設定可能なメッセージエンコーダーの置換。
  3. パブリッシャーやサブスクライバーのClose()の実装は以下であるべきです:
    • 冪等性を持つこと
    • パブリッシャーやサブスクライバーがブロックされている場合に正常に動作すること(例:Ackを待っている)
    • 出力チャネルがブロックされている場合に正常に動作すること(誰もリスニングしていないため)
  4. 受信したメッセージのAck()Nack()のサポート。
  5. Nack()を使用して購読されたメッセージの再配信のサポート。
  6. 汎用的なパブリッシャー/サブスクライバーのテストを使用する。デバッグのヒントについては、テストのトラブルシューティングガイドを参照してください。
  7. パフォーマンスの最適化。
  8. GoDocs、Markdownドキュメント、そして導入的な例。