পরিচিতি

মিডলওয়্যার ব্যবহৃত হয় ইভেন্ট ফ্রেমওয়ার্ক এক্সটেন্ড করার জন্য, কাস্টম ফাংশনালিটি যোগ করার জন্য, এবং অটোরা লজিক সম্পর্কিত গুরুত্বপূর্ণ ফাংশনালিটি প্রদান করার জন্য ব্যবহৃত। উদাহরণস্বরূপ, ত্রুটি রিটার্ন করার পরে নতুন চেষ্টা করা, অথবা হ্যান্ডলার ভিতরে প্যানিক থেকে মুক্তি পেতে বা স্ট্যাক ট্রেস ক্যাপচার করা।

হ্যান্ডলার মিডলওয়্যার ফাংশনের সিগনেচার নিম্নলিখিত ভাবে সংজ্ঞায়িত হয়:

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware আমাদেরকে হ্যান্ডলারের মতো ডেকোরেটর লেখার জন্য অনুমতি দেয়।
// এটি হ্যান্ডলারের আগে কিছু অপারেশন সঞ্চালন করতে পারে (যেমন, কনসিউম মেসেজ পরিবর্তন করা)
// এবং এর পরেও কিছু অপারেশন সঞ্চালন করতে পারে (বিপ্রোদিত মেসেজ পরিবর্তন করা, কনসিউম মেসেজকে ACK/NACK করা, ত্রুটি হ্যান্ডলিং করা, লগিং ইত্যাদি)।
//
// এটি রাউটারে এ্ডমিডডমিডলওয়্যারারে লাগি ব্যবহার করা যাবে `AddMiddleware` মেথড ব্যবহার করে।
//
// উদাহরণ:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("হ্যান্ডলার সঞ্চালন করার আগে")
//			প্রোডিউস মেসেজ, ত্রুটি := h(message)
//			fmt.Println("হ্যান্ডলার সঞ্চালন করার পরে")
//
//			প্রডিউস মেসেজ, ত্রুটি রিটার্ন করা
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

ব্যবহার

মিডলওয়্যার গুলি রাউটারের সমস্ত হ্যান্ডলারের জন্য প্রযোগ করা যেতে পারে বা নির্দিষ্ট হ্যান্ডলারের জন্য। যখন মিডলওয়্যার রাউটারে সরাসরি যোগ করা হয়, তখন রাউটারের জন্য সরবরাহ করা সমস্ত হ্যান্ডলারে প্রযোগ করা হবে। যদি কোনও মিডলওয়্যার শুধুমাত্র নির্দিষ্ট হ্যান্ডলারের জন্য প্রযোগ করা হয়, তবে তা রাউটারে হ্যান্ডলারে যোগ করতে হবে।

এখানে একটি ব্যবহারের উদাহরণ:

পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERM সিগনাল পেলে, SignalsHandler সম্মানিতভাবে রাউটার বন্ধ করবে।
	// রাউটারকে আপনি `r.Close()` কল করেও বন্ধ করতে পারেন
	router.AddPlugin(plugin.SignalsHandler)

	// রাউটার-স্তরের মিডলওয়্যারগুলি রাউটারে প্রেরিত সমস্ত বার্তায় ব্যাপক হবে
	router.AddMiddleware(
		// CorrelationID প্রেরিত বার্তার মেটাডেটা থেকে সঙ্গী বার্তার কর্লেশন আইডি কপি করবে
		middleware.CorrelationID,

		// যদি হ্যান্ডলার থেকে একটি ত্রুটি প্রেরণ করে, তবে এটি পূনঃপ্রয়াস করা হবে
		// এটি সর্বাধিক ম্যাক্সটাইম সময় MaxRetries মাঝে, পরে বার্তাটি PubSub দ্বারা Nacked এবং পুনরায় প্রেরিত হবে
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer হ্যান্ডলারে প্যানিক সম্পর্কে বিষয়ান করে
		// এই মামলায়, এটি তাদেরকে পুনঃপ্রয়াস হিসেবে ত্রুটি দিবে Retry মিডলওয়্যারে
		middleware.Recoverer,
	)

	// সহজতার জন্য, আমরা এখানে গোচ্যানেল পাব / সাব ব্যবহার করি,
	// আপনি এটি যে কোনও পাব / সাব আমলেন্টেশন দিয়ে তার জন্য প্রতিস্থাপন করতে পারেন এবং এটি একইভাবে কাজ করবে।
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// স্বাগতসূচক ভাবে কিছু প্রেরণ বার্তাগুলি পাশে পাঠানো
	go publishMessages(pubSub)

	// AddHandler একটি হ্যান্ডলার ফিরে পাঠানোর জন্য ব্যবহার করা যায়
	// বা হ্যান্ডলার বন্ধ করতে।
	handler := router.AddHandler(
		"struct_handler",          // হ্যান্ডলার নাম, অবশ্যই অনন্য
		"incoming_messages_topic", // থিম যাতে ইভেন্ট পড়া হবে
		pubSub,
		"outgoing_messages_topic", // থিম যাতে ইভেন্ট প্রকাশ হবে
		pubSub,
		structHandler{}.Handler,
	)

	// হ্যান্ডলার-স্তরের মিডলওয়্যারটি শুধুমাত্র নির্দিষ্ট হ্যান্ডলার জন্য প্রয়োগ করা হয়
	// এমন মিডলওয়্যার হ্যান্ডলারে ঠিকই রাউটার-স্তরের মিডলওয়্যার যেভাবে যোগ করা যায়
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Executing handler-specific middleware for", message.UUID)

			return h(message)
		}
	})

	// বাগবিচারের উদ্দেশ্যে মাত্র, আমরা সবগুলি বার্তা পাই - `incoming_messages_topic`
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// বাগবিচারের উদ্দেশ্যে মাত্র, আমরা সব ইভেন্ট পাঠাই - `outgoing_messages_topic`
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// এখন যেসব হ্যান্ডলার নিবন্ধিত হয়েছে, সেসব হ্যান্ডলার আমরা রান করতে পারি।
	// রান রাউটার বন্ধ না হওয়া পর্যন্ত ব্লক করবে।
// ...

উপলব্ধ মিডলওয়্যার

এখানে Watermill দ্বারা প্রদান করা পুনর্ব্যবহারযোগ্য মিডলওয়্যারগুলি রয়েছে, এবং আপনি সহজেই আপনার নিজের মিডলওয়্যার বাস্তবায়ন করতে পারেন। উদাহরণস্বরূপ, যদি আপনি চান প্রতিটি ইনকামিং বার্তাকে একটি নির্দিষ্ট ধরণের লগ ফরম্যাটে সংরক্ষণ করতে, তবে এটি এটি সেরা উপায়।

সার্কিট ব্রেকার

// CircuitBreaker হল এমন একটি মিডলওয়্যার যা হ্যান্ডলারের কোম্পোনি দিয়ো সাক্ষর দিয়ে রখে।
// কনফিগারেশনের উপর ভিত্তি করে, সার্কিট ব্রেকার আগে থেকেই ত্রুটি রিটার্ন করলে দ্রুত ব্রেক করবে।
// এটা খুব দরকারি যখন এডুড়ি ব্রেকডাউন প্রবণ করা যায়।
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker নতুন CircuitBreaker মিডলওয়্যার রিটার্ন করে।
// উপলব্ধ সেটিংসের জন্য, gobreaker ডকুমেন্টেশনে দেখুন।
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware CircuitBreaker মিডলওয়্যার রিটার্ন করে।
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

সম্পর্ক

// SetCorrelationID মেসেজের জন্য সম্পর্ক আইডি সেট করে।
//
// যখন একটি মেসেজ সিস্টেমে প্রবেশ করে, SetCorrelationID কল করা উচিত।
// যখন একটি মেসেজ অনুরোধে (যেমন, HTTP) তৈরি হয়, মেসেজের সম্পর্ক আইডি অনুরোধের সম্পর্ক আইডির সাথে একই হতে হবে।
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID মেসেজ থেকে সম্পর্ক আইডি রিটার্ন করে।
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID হ্যান্ডলার দ্বারা প্রস্তুতিকৃত সকল মেসেজে একটি সম্পর্ক আইডি যুক্ত করে।
// এই আইডি হ্যান্ডলার দ্বারা প্রাপ্ত মেসেজ আইডির উপর ভিত্তি করে।
//
// CorrelationID সঠিকভাবে কাজ করার জন্য, প্রথমে মেসেজটি সিস্টেমে প্রবেশ করার জন্য SetCorrelationID কল করা আবশ্যক।
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

ডুপ্লিকেটর

// Duplicator বারবারিকরণ করে যাতে অ্যান্ডপয়েন্ট বিশ্রামিত হয়।
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

ভুল অগ্রাহ্য করুন

// IgnoreErrors হ্যান্ডলারের নিরাপত্তা দিয়ে বিশেষ কিছু গলাম অগ্রাহ্য করার একটি মিডলওয়্যার সরবরাহ করে।
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors নতুন IgnoreErrors মিডলওয়্যার তৈরি করে।
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware IgnoreErrors মিডলওয়্যার রিটার্ন করে।
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

তাৎক্ষণিক প্রমাণ করুন

// InstantAck হ্যান্ডলারের দ্বারা আসা মেসেজকে তাৎক্ষণিকভাবে গ্রহণ করে, যে কোনও ভুলের কোনও পরিচয়ে।
// এটি দ্রুততা উন্নত করার জন্য ব্যবহার করা যায়, তবে লেনদেন হ্রাসের বিনিময় হল:
// যদি আপনি নিশ্চিতভাবে একই-একবার বিতরণ নিতে চান, তবে ন্যূনতম একবার বিতরণ পেতে পারেন।
// যদি আপনি ক্রমিকভাবে মেসেজ প্রয়োজন হয়, তবে এর ক্রমবর্ধমান ভদ্রতা বিচ্ছেদ হতে পারে।
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

বিষ

// PoisonQueue একটি মিডলওয়্যার সরবরাহ করে যা অপ্রক্রিয় বার্তা নিয়ন্ত্রণ করতে এবং তাদেরকে একটি পৃথক বিষয় প্রকাশ করতে সাহায্য করে।
// তারপরে, প্রধান মিডলওয়্যার শ্রেণী চালিয়ে যায়, এবং ব্যবসা সাধারণভাবে চালিত হয়।
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter বিষয়ীয় এমনকি হয়, কিন্তু বিষের সারিয়ের মান নির্ধারণ করা একটি ফাংশন গ্রহণ করে।
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

এলোমেলী ব্যর্থতা

// র‍্যান্ডম পরীক্ষা মধ্যের হ্যান্ডলার ব্যর্থ করা কারণের ভিত্তিতে কার্যকরী। ত্রুটির সম্ভাবনা অবশ্যই পরিসীমা (0, 1) এর মধ্যে হতে হবে।
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("একটি এলোমেলী ত্রুটি ঘটেছে")
			}
			return h(message)
		}
	}
}

// RandomPanic এলোমেলী প্যানিক কারণের ভিত্তিতে হ্যান্ডলারকে প্যানিক করায়। প্যানিক সম্ভাবনা অবশ্যই পরিসীমা (0, 1) এর মধ্যে হতে হবে।
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("একটি এলোমেলী প্যানিক ঘটেছে")
			}
			return h(message)
		}
	}
}

পুনঃঅর্জিত করা

// RecoveredPanicError রিকভার প্যানিক এর ত্রুটি এবং এর স্ট্যাক ট্রেস তথ্য ধারণ করে।
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer কার্যকারি থেকে প্যানিক মুক্ত করে এবং কোনও ত্রুটি এর সাথে প্রিভল প্যানিক ত্রুটি ফেরুন যুক্ত করে।
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

পুনঃচেষ্টা

// Retry একটি মিডলওয়্যার সরবরাহ করে যদি একটি ত্রুটি ফিরিয়ে আসে।
// পুনঃচেষ্টা আচরণ, বৃদ্ধি পেট, এবং সর্বাধিক অবস্থান সময় কনফিগার করা যায়।
type Retry struct {
	// MaxRetries করা অনেক প্রচেষ্টা এবং শেষ এর মৌলিক সময় সেট করা। পরবর্তী ইন্টারভেল গুনিতক হবে।
	MaxRetries int

	// InitialInterval হ'ল পুনরাবৃত্তির মধ্যবর্তী আন্তর। পরবর্তী ইন্টারভেল মাল্টিপ্লায়ার দ্বারা স্কেল করা হবে।
	InitialInterval time.Duration
	// MaxInterval পুনরাবৃত্তির জন্য বৃদ্ধির সর্বোচ্চ সীমা নির্ধারণ করে।
	MaxInterval time.Duration
	// Multiplier হ'ল যাৰ মাধ্যমে পুনঃচেষ্টা মধ্যে অপেক্ষা আন্তর গুণিত হবে।
	Multiplier float64
	// MaxElapsedTime পুনরাবৃত্তির জন্য সর্বাধিক সময় সীমা নির্ধারণ করে। যদি 0, তবে এটা অক্ষম হবে।
	MaxElapsedTime time.Duration
	// RandomizationFactor এক্ষেত্রে পেছনের সংখ্যার মধ্যে ব্যাখ্যা বাড়িয়ে দেবে, যা এক্ষেত্রে অনুমোদিত হবে:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook হ'ল প্রতিটি পুনরাবৃত্তি প্রয়াসে সালাম থাকার জন্য একটি ঐচ্ছিক ফাংশন।
	// বর্তমান পুনশ্চেষ্টা সংখ্যা জন্য পুনশ্চেষ্টা নিয়ে পারা হয়।
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware দায়িত্ব পরিবর্তন করে Retry মিডলওয়্যার প্রদান করে।
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

থ্রটল

// Throttle একটি মিডলওয়্যার সরবরাহ করে যেটি নির্ধারিত সময়কের মধ্যে প্রসেস করা বার্তা সংখ্যা সীমিত করে।
// এটি ব্যবহার করা যায় যাতে অপ্রক্রিয় দীর্ঘ সারি চালিত ক্ষমতাধারী উপর ভারোচ্চারন বিরোধ করতে।
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle একটি নতুন Throttle মিডলওয়ের তৈরি করে।
// উদাহরণ সময় এবং গণনা: NewThrottle(10, time.Second) দেখা যায় 10 বার্তা প্রতি সেকেন্ড।
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware দায়িত্ব পরিবর্তন করে Throttle মিডলওয়্যার প্রদান করে।
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Throttles shared by multiple handlers will wait for their "ticks".

সময় সীমা

// সময়সীমা নির্দিষ্ট সময় পরে আসা বার্তার সংদেশের সংদেশ ঘন্টা রিসেট করে।
// যদি হ্যান্ডলারের যে কোনও সময় সাজগুরুবাহি প্রভাবশালীতা msg.Context().Done() এ শোনা উচিত।
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}