Message
Message is one of the core parts of Watermill. Messages are published by "publishers" and received by "subscribers." When processing messages, if the processing fails, you should send an Ack()
(indicating successful processing) or a Nack()
(indicating processing failure).
The Ack
and Nack
of the message are handled by subscribers (in the default implementation, subscribers will wait for Ack
or Nack
).
Full source code: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
type Message struct {
// UUID is the unique identifier of the message.
//
// It is used for debugging by Watermill.
// UUID can be empty.
UUID string
// Metadata contains the metadata of the message.
//
// Can be used to store data that does not need to be decoded from the entire payload.
// It is similar to HTTP request headers.
//
// Metadata will be marshaled and saved to PubSub.
Metadata Metadata
// Payload is the payload of the message.
Payload Payload
// ack is closed when an acknowledgment is received.
// noAck is closed when a negative acknowledgment is received.
ack chan struct{}
noAck chan struct{}
ackMutex sync.Mutex
ackSentType ackType
ctx context.Context
}
// ...
Ack
Sending Ack
Full source code: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Ack sends an acknowledgment for the message.
//
// Ack does not block.
// Ack is idempotent.
// Returns false if Nack has already been sent.
func (m *Message) Ack() bool {
// ...
Nack
Full source code: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Nack sends a negative acknowledgment for the message.
//
// Nack does not block.
// Nack is idempotent.
// Returns false if Ack has already been sent.
func (m *Message) Nack() bool {
// ...
Receiving Ack/Nack
Full source code: github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go
// ...
select {
case
Context
Messages contain the standard library's context, just like an HTTP request.
Full source code: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Context returns the context of the message. To change the context, use SetContext.
//
// Returned context is always non-nil; default is the background context.
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
return context.Background()
}
// SetContext sets the provided context as the message's context.
func (m *Message) SetContext(ctx context.Context) {
m.ctx = ctx
}
// ...