Pengantar
Middleware digunakan untuk memperluas kerangka kerja acara, menambahkan fungsionalitas kustom, dan menyediakan fungsionalitas penting yang tidak terkait dengan logika penangan utama. Misalnya, mengulangi penangan setelah mengembalikan kesalahan, atau memulihkan dari kepanikan dan menangkap jejak tumpukan dalam penangan.
Tanda tangan fungsi middleware didefinisikan sebagai berikut:
Kode sumber lengkap: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware memungkinkan kita untuk menulis dekorator yang serupa dengan penangan.
// Itu dapat menjalankan beberapa operasi sebelum penangan (misalnya, memodifikasi pesan yang dikonsumsi)
// dan juga melakukan beberapa operasi setelah penangan (memodifikasi pesan yang diproduksi, ACK/NACK pesan yang dikonsumsi, menangani kesalahan, logging, dll.).
//
// Itu dapat dilampirkan ke router dengan menggunakan metode `AddMiddleware`.
//
// Contoh:
//
// func ExampleMiddleware(h HandlerFunc) HandlerFunc {
// return func(message *Message) ([]*Message, error) {
// fmt.Println("Sebelum menjalankan penangan")
// pesanDiproduksi, err := h(pesan)
// fmt.Println("Setelah menjalankan penangan")
//
// return pesanDiproduksi, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Penggunaan
Middleware dapat diterapkan pada semua penangan di router atau pada penangan tertentu. Ketika middleware ditambahkan langsung ke router, itu akan diterapkan pada semua penangan yang disediakan untuk router. Jika sebuah middleware hanya diterapkan pada penangan tertentu, itu perlu ditambahkan ke penangan di router.
Berikut adalah contoh penggunaan:
Kode Sumber Lengkap: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Ketika menerima sinyal SIGTERM, SignalsHandler akan menutup router dengan baik.
// Anda juga dapat menutup router dengan memanggil `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Middleware tingkat router akan dieksekusi pada setiap pesan yang dikirim ke router
router.AddMiddleware(
// CorrelationID akan menyalin ID korelasi dari metadata pesan masuk ke pesan yang dihasilkan
middleware.CorrelationID,
// Jika penangan mengembalikan kesalahan, itu akan diulang.
// Ini akan diulang paling banyak MaxRetries kali, setelah itu pesan akan Nacked dan dikirim ulang oleh PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer menangani panic dalam penangan.
// Dalam kasus ini, itu memperlakukan mereka sebagai kesalahan ke middleware Retry.
middleware.Recoverer,
)
// Untuk kesederhanaan, kami menggunakan gochannel Pub/Sub di sini,
// Anda dapat menggantinya dengan implementasi Pub/Sub apa pun dan itu akan berfungsi sama.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Menerbitkan beberapa pesan masuk di latar belakang
go publishMessages(pubSub)
// AddHandler mengembalikan penangan yang dapat digunakan untuk menambahkan middleware tingkat penangan
// atau untuk menghentikan penangan.
handler := router.AddHandler(
"struct_handler", // Nama penangan, harus unik
"incoming_messages_topic", // Topik dari mana kejadian akan dibaca
pubSub,
"outgoing_messages_topic", // Topik ke mana kejadian akan diterbitkan
pubSub,
structHandler{}.Handler,
)
// Middleware tingkat penangan hanya dieksekusi untuk penangan tertentu
// Middleware seperti itu dapat ditambahkan ke penangan dengan cara yang sama dengan middleware tingkat router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Menjalankan middleware khusus penangan untuk", message.UUID)
return h(message)
}
})
// Hanya untuk tujuan debugging, kami mencetak semua pesan yang diterima pada `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Hanya untuk tujuan debugging, kami mencetak semua kejadian yang dikirim ke `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Sekarang setelah semua penangan telah didaftarkan, kami dapat menjalankan router.
// Run akan memblokir hingga router berhenti berjalan.
// ...
Middleware Tersedia
Berikut adalah middleware yang dapat digunakan ulang yang disediakan oleh Watermill, dan Anda juga dapat dengan mudah mengimplementasikan middleware sendiri. Misalnya, jika Anda ingin menyimpan setiap pesan masuk dalam jenis format log tertentu, ini adalah cara terbaik untuk melakukannya.
Circuit Breaker
// CircuitBreaker adalah suatu middleware yang melingkupi handler dalam suatu circuit breaker.
// Berdasarkan konfigurasi, circuit breaker akan gagal cepat jika handler terus mengembalikan kesalahan.
// Ini berguna untuk mencegah kegagalan yang menciptakan efek domino.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker mengembalikan sebuah middleware CircuitBreaker baru.
// Untuk pengaturan yang tersedia, silakan merujuk ke dokumentasi gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware mengembalikan middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Korelasi
// SetCorrelationID menetapkan ID korelasi untuk pesan.
//
// Ketika sebuah pesan memasuki sistem, SetCorrelationID harus dipanggil.
// Ketika sebuah pesan dihasilkan dalam sebuah permintaan (misalnya, HTTP), ID korelasi pesan harus sama dengan ID korelasi permintaan.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID mengembalikan ID korelasi dari pesan.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID menambahkan sebuah ID korelasi ke semua pesan yang dihasilkan oleh handler.
// ID didasarkan pada ID pesan yang diterima oleh handler.
//
// Agar CorrelationID dapat bekerja dengan benar, SetCorrelationID harus dipanggil terlebih dahulu agar pesan dapat memasuki sistem.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Pengganda
// Duplicator memproses pesan dua kali untuk memastikan titik akhir bersifat idempoten.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Abaikan Kesalahan
// IgnoreErrors menyediakan sebuah middleware yang memungkinkan handler untuk mengabaikan beberapa kesalahan yang secara eksplisit didefinisikan.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors membuat sebuah middleware IgnoreErrors baru.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware mengembalikan middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Pengakuan Instan
// InstantAck membuat handler segera mengakui pesan masuk, terlepas dari adanya kesalahan.
// Ini dapat digunakan untuk meningkatkan throughput, namun dengan kompensasi:
// Jika Anda perlu memastikan pengiriman tepat sekali, Anda mungkin mendapatkan setidaknya satu pengiriman.
// Jika Anda memerlukan pesan terurut, ini dapat memecahkan urutan.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Racun
// PoisonQueue menyediakan fitur middleware untuk menangani pesan yang tidak dapat diproses dan menerbitkannya ke topik terpisah.
// Lalu, rantai middleware utama tetap melanjutkan eksekusi, dan bisnis berjalan seperti biasa.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter mirip dengan PoisonQueue, tetapi menerima fungsi untuk menentukan kesalahan mana yang memenuhi kriteria antrian racun.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Kegagalan Acak
// RandomFail menyebabkan handler gagal berdasarkan probabilitas acak. Probabilitas kesalahan harus berada dalam rentang (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("terjadi kesalahan acak")
}
return h(message)
}
}
}
// RandomPanic menyebabkan handler mengalami panic berdasarkan probabilitas acak. Probabilitas panic harus berada dalam rentang (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("terjadi panic acak")
}
return h(message)
}
}
}
Recoverer
// RecoveredPanicError menyimpan error panic yang dipulihkan dan informasi jejak tumpukannya.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer memulihkan setiap panic dari handler dan melampirkan RecoveredPanicError beserta jejak tumpukan ke setiap kesalahan yang dikembalikan dari handler.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Coba Lagi
// Retry menyediakan middleware yang akan mencoba ulang penangan jika terjadi kesalahan.
// Perilaku pengulangan, keterlambatan eksponensial, dan waktu maksimum yang diperbolehkan dapat dikonfigurasi.
type Retry struct {
// MaxRetries adalah jumlah maksimum percobaan yang akan dilakukan.
MaxRetries int
// InitialInterval adalah interval awal antara percobaan ulang. Interval selanjutnya akan diperbesar oleh Multiplier.
InitialInterval time.Duration
// MaxInterval menetapkan batas atas untuk pengulangan keterlambatan eksponensial.
MaxInterval time.Duration
// Multiplier adalah faktor dengan mana interval tunggu antara pengulangan akan dikalikan.
Multiplier float64
// MaxElapsedTime menetapkan batas waktu maksimum untuk percobaan ulang. Jika 0, fitur ini dinonaktifkan.
MaxElapsedTime time.Duration
// RandomizationFactor secara acak menyebar waktu penundaan dalam rentang berikut:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook adalah fungsi opsional yang akan dieksekusi pada setiap percobaan ulang.
// Nomor percobaan ulang saat ini dilewatkan melalui retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware mengembalikan middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Pembatasan
// Pembatasan menyediakan middleware untuk membatasi jumlah pesan yang diproses dalam jangka waktu tertentu.
// Ini dapat digunakan untuk mencegah handler yang berjalan pada antrian yang belum diproses terlalu banyak.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle membuat middleware Pembatasan baru.
// Contoh durasi dan hitungan: NewThrottle(10, time.Second) menunjukkan 10 pesan per detik.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware mengembalikan middleware Pembatasan.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Pembatasan yang digunakan oleh beberapa penangan akan menunggu "tik" mereka.
Batas Waktu
// Batas Waktu membatalkan konteks pesan masuk setelah durasi yang ditentukan.
// Semua fungsi handler yang sensitif terhadap batas waktu harus mendengarkan msg.Context().Done() untuk mengetahui kapan harus gagal.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}