শুরু করার নির্দেশিকা

এই টিউটোরিয়ালে, আমরা দুটি প্রোগ্রাম, client এবং workers লিখব।

  • client.go টি ব্যাকগ্রাউন্ড ওয়ার্কার থ্রেড দ্বারা প্রক্রিয়া করা হতে ইচ্ছুক টাস্কগুলি তৈরি এবং অনুসূচিত করবে।
  • workers.go টি ক্লায়েন্ট দ্বারা তৈরি করা টাস্কগুলি হ্যান্ডেল করতে বহুপক্ষীয় ওয়ার্কার থ্রেড চালু করবে।

এই গাইড অনুমান করে, আপনি localhost:6379 এ রান করা Redis সার্ভার ব্যবহার করছেন। শুরু করার আগে, দয়া করে নিশ্চিত করুন যে Redis ইনস্টল এবং চালু আছে।

আসুন প্রথমে আমাদের দুটি প্রধান ফাইল তৈরি করি।

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

তারপর, asynq প্যাকেজটি ইনস্টল করুন।

go get -u github.com/hibiken/asynq

আমরা কোড লিখতে শুরু করার আগে, চলুন কিছু কোর টাইপ যারা এই দুটি প্রোগ্রামে ব্যবহৃত হবে তাদের পর্যালোচনা করি।

রেডিস সংযোগ বিকল্প

Asynq রেডিসকে একটি মেসেজ ব্রোকার হিসাবে ব্যবহার করে। client.go এবং workers.go উভয়ই রেডিসে পড়া এবং লিখা করার জন্য সংযোগ করতে হবে। আমরা স্থানীয়ভাবে চলার রেডিস সার্ভারের সংযোগ নির্দিষ্ট করতে RedisClientOpt ব্যবহার করব।

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // প্রয়োজন না হলে, পাসওয়ার্ড অপসারিত করা যেতে পারে
    Password: "mypassword",
    // এসাইনকিউর জন্য একটি বিশেষ ডাটাবেস নম্বর ব্যবহার করুন।
    // ডিফল্টভাবে, রেডিস ১৬ ডাটাবেস সরবরাহ করে (0 থেকে 15)
    DB: 0,
}

কার্য

asynq তে, কাজের ইউনিটগুলি একটি টাইপ নামক টাইপে এনক্যাপসুলেট করা হয়, যা ধারাবাহিকভাবে দুটি ফিল্ড: Type এবং Payload আছে।

// ধরনটি হলো একটি স্ট্রিং মান যা কাজের প্রকারটি নির্দেশক।
func (t *Task) Type() string

// পেলোডটি কাজ সম্পাদনের জন্য প্রয়োজনীয় তথ্য।
func (t *Task) Payload() []byte

এখন যখন আমরা মূল টাইপগুলির পর্যালোচনা করেছি, আসুন আমাদের প্রোগ্রাম লিখা শুরু করি।

ক্লায়েন্ট প্রোগ্রাম

client.go তে, আমরা কিছু টাস্ক তৈরি এবং এনকিউ করব।

টাস্ক তৈরি করতে, আপনি NewTask ফাংশন ব্যবহার করতে পারেন এবং টাস্কের ধরন এবং পেলোড পাস করতে পারেন।

Enqueue মেথডটি একটি টাস্ক এবং যেকোনো সংখ্যক অপশন নিয়ে নেয়। ভবিষ্যতে প্রসেসিং এর জন্য টাস্কগুলি অনুসূচিত করার জন্য একটি অপশন হিসেবে ProcessIn বা ProcessAt ব্যবহার করুন।

// ইমেল টাস্কের সম্পর্কিত পেলোড।
type EmailTaskPayload struct {
    // ইমেল প্রাপকের আইডি।
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // টাস্ক ধরন এবং পেলোড দিয়ে একটি টাস্ক তৈরি করুন।
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // টাস্ক দ্রুত প্রসেস করুন।
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] সফলভাবে টাস্ক ইনকিউ করা হয়েছে: %+v", info)

    // ২৪ ঘন্টা পরে টাস্ক প্রসেস করুন।
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] সফলভাবে টাস্ক ইনকিউ করা হয়েছে: %+v", info)
}

আমাদের ক্লায়েন্ট প্রোগ্রামের জন্য আমরা এই পর্যালোচনাটি পর্যাপ্ত আছে।

ওয়ার্কার্স প্রোগ্রাম

workers.goতে, আমরা ওয়ার্কার্স চালু করতে একটি asynq.Server উদাহরণ তৈরি করব।

NewServer ফাংশনটি RedisConnOpt এবং Config কে প্যারামিটার হিসেবে নেয়।

Config টা পদক্ষেপ গ্রহণকারী সার্ভারের কাজের প্রতিষ্ঠান সাজানোর জন্য ব্যবহৃত হয়। আপনি সমস্ত উপলব্ধ কনফিগারেশন অপশন সম্পর্কে শিখতে পারেন Config দ্য ডকুমেন্টেশন থেকে।

সুবিধার্থে, এই উদাহারণে, আমরা কেবলমাত্ সংক্রান্তভাবে ব্যবহার করবো।

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // লক্ষ্যচিহ্ন: পরবর্তী বিভাগে, আমরা কি বোধ় দিচ্ছি `handler`?
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run এর প্যারামিটার হল ইন্টারফেস asynq.Handler, যা একটি মেথড ProcessTask হয়।

type Handler interface {
    // যদি টাস্কটি সাফল্যের সাথে প্রক্রিয়াজাত হয়, তবে ProcessTask নিল রিটার্ন করতে হবে।
    // যদি ProcessTask নিলো ইরর রিটার্ন করে বা একটি প্যানিক উত্পন্ন করে, তাহলে টাস্কটি পরে পুনরায় প্রয়াস করা হবে।
    ProcessTask(context.Context, *Task) error
}

হ্যান্ডলার ব্যবহার করা একটি সর্বসাধারণ পদ্ধতি হল, একটি বিভিন্ন বৈশিষ্ট্য সমগ্র লিখা এবং এটি ভরণো করা যোকা function. asynq.HandlerFunc অ্যাডাপ্টার টাইপ ব্যবহৃত করা হয়া যেতে।

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ব্যবহারকারী %d এর ওয়েলকাম ইমেল পাঠানো হচ্ছে", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ব্যবহারকারী %d কে রিমাইন্ডার ইমেল পাঠানো হচ্ছে", p.UserID)

    default:
        return fmt.Errorf("অপ্রত্যাশিত টাস্ক প্রকার: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // ব্যবহার কারজের জন্য `asynq.HandlerFunc` অ্যাডাপ্টার ব্যবহার করুন
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

আমরা এই হ্যান্ডালার ফাংশনের জন্য সুইচ কেস যোগ করতে চাই, কিন্তু একটি আসল অ্যাপ্লিকেশনে, প্রতিটি মামলার জন্য মোটা ভাবে যোগ করা শুধু আরও সুবিধাজনক হবে। আমরা আমাদের কোড পুনর্বিচার করার জন্য, আসুন ServeMux ব্যবহার করে আমাদের হ্যান্ডলারগুলি তৈরি করি। "net/http" প্যাকেজে থেকে ServeMux এর মতো ।, আপনি Handle বা HandleFunc কল করে হ্যান্ডলার নিবন্ধন করতে পারেন। ServeMux একটি Handler ইন্টারফেস পূর্ণ করে, তাই এটি পাস করা যাবে (*Server).Run এর জন্য।

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d এর ওয়েলকাম ইমেল পাঠানো হচ্ছে", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d কে রিমাইন্ডার ইমেল পাঠানো হচ্ছে", p.UserID)
    return nil
}

এখন যোক্তো আমরা প্রতিটি প্রকারের টাস্কের জন্য নিয়ন্ত্রণ করার জন্য হ্যান্ডলিং ফাংশনগুলি পুনর্বিচার করতে পারি, কিন্তু আসল অ্যাপ্লিকেশনে, আমাদের মোটাভাবে সাবধান থাকতে হবে। আমরা এই স্ট্রিং মানগুলির জন্য এবং লোড ক্ষমতা মানগুলি ইউনিকেপ করা উচিত, আমরা আমাদের কোড পুনর্গঠন করব এবং টাস্ক নির্মাণ এবং হ্যান্ডলিং থেকে বিচ্ছিন্ন একটি প্যাকেজ লিখব। আমরা শুধুমাত্র একটা প্যাকেজ তৈরি করুন এবং এটি যাাত বলো হয় task.

ম্যানুয়ালি ভেস্টাউন্ট ট্রান্সমিশনষয়নের
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// টাস্কের ধরণের তালিকা।
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// ইমেইল সংক্রান্ত যেকোন টাস্কের জন্য পেলোড।
type EmailTaskPayload struct {
    // ইমেইল প্রাপকের আইডি।
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d-কে স্বাগত ইমেইল পাঠানো হচ্ছে", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d-কে অনুস্মারক ইমেইল পাঠানো হচ্ছে", p.UserID)
    return nil
}

এখন client.go এ এই প্যাকেজটি ইম্পোর্ট করা যাবে client.go এ এবং workers.go এ।

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // টাস্কগুলি তাৎপরতায় ইনকিউ করুন।
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] টাস্ক সফলভাবে ইনকিউ করা হয়েছে: %+v", info)

    // ২৪ ঘণ্টার পরে প্রক্রিয়াজাত করার জন্য টাস্ক ইনকিউ করুন।
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] টাস্ক সফলভাবে ইনকিউ করা হয়েছে: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

কোডটি এখন ঠিকঠাক দেখতে চলেছে!

clientworkers প্রস্তুত হলে, আমরা এই দুটি প্রোগ্রামগুলি চালাতে পারি। যতটুকু কাজ করার জন্য client প্রোগ্রামটি চালানোর আগে আমরা শুরু করি।

go run client/client.go

এটি দুটি কাজ তৈরি করবে: একটি তাৎপর্যে প্রসেসিং এবং আরেকটি ২৪ ঘণ্টার পরে প্রসেসিং করার জন্য।

আসুন asynq কমান্ড লাইন ইন্টারফেস ব্যবহার করে টাস্কগুলি পরীক্ষা করা যাক।

asynq dash

আপনি দেখতে পারবেন যে Enqueued অবস্থায় একটি টাস্ক এবং Scheduled অবস্থায় আরেকটি টাস্ক আছে।

নোট: প্রতিটি অবস্থার অর্থ বোঝার জন্য, দয়া করে টাস্ক জীবনচক্র দেখুন।

শেষবার, টাস্কগুলি প্রসেস করার জন্য workers প্রোগ্রামটি চালু করা যাক।

go run workers/workers.go

নোট: এই প্রোগ্রামটি প্রস্থান করা না পর্যন্ত একটি সিগন্যাল পাঠানো না করা পর্যন্ত বন্ধ হবে না। পটভংকামূলক প্রচারণার সম্পর্কে সেরা অনুশীলনগুলি জানতে, দয়া করে সিগন্যাল উইকি পেজ দেখুন।

আপনি দেখতে পারবেন যে টার্মিনালে কিছু টেক্সট আউটপুট দেখা যাচ্ছে, যাতে টাস্কগুলি সফলভাবে প্রসেস করা হয়েছে।

আপনি আবার client প্রোগ্রামটি চালিয়ে দেখতে পারবেন যেভাবে ওইগুলি টাস্ক গ্রহণ করে এবং তাদের প্রসেস করে।

প্রথম প্রচেষ্টায় একটি টাস্ক সফলভাবে প্রসেস করা হতে না পারা টাস্ক কোনও অসফল অবস্থায় ফেলে যাওয়া খুব অসম্ভাব। ডিফল্টভাবে, ব্যাকঅফমগুলি সহ পুনৰাবৃত্তি হবে দ্বিগুণ প্রচুর। আমাদের হ্যান্ডলারটি আপডেট করতে একটি ত্রুটি রিটার্ন করার মতো ভুল শিটুয়েশনটি সহজভাবে প্রতিপ্রারিত করি।

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d কে স্বাগতম ইমেইল পাঠানোর চেষ্টা করা হচ্ছে...", p.UserID)
    return fmt.Errorf("ব্যবহারকারীকে ইমেইল পাঠানো ব্যর্থ হয়েছে")
}

আসুন আমাদের ওয়ার্কারস প্রোগ্রাম পুনরায় আরম্ভ করি এবং একটি টাস্ক ইনকিউ করি।

go run workers/workers.go
go run client/client.go

যদি আপনি asynq ড্যাশ চালু আছেন, তবে আপনি দেখতে পারবেন একটি টাস্ক Retry অবস্থায় (কিউ বিস্তার দেখার জন্য নির্দেশনা বিচার অংশটি উদ্ধার করে "রিট্রাই" ট্যাবে হাইলাইট করার মাধ্যমে)।

রিট্রাই অবস্থায় কোন টাস্ক কোনও চেক করতে, আপনি নিম্নলিখিত কমান্ড চালাতে পারেন:

asynq task ls --queue=default --state=retry

এটি ভবিষ্যতে রিট্রাই করা হবে সমস্ত টাস্কগুলি তালিকা করবে। আউটপুট তার প্রতি টাস্কের প্রত্যাশিত পরবর্তী সময় সহ থাকবে।

একবার একটি টাস্ক আপনার রিট্রাই প্রয়াস শেষ হয়ে যায়, তা অবস্থান প্রয়ানকর পরিবর্তন করবে এবং আরো রিট্রাই করা হবে না (আপনি এখনও CLI বা WebUI সরঞ্জাম ব্যবহার করে অভ্যন্তরীণভাবে সংরক্ষিত করা টাস্কগুলি পরিচালনা করতে পারেন)।

এই টিউটোরিয়াল সমাপ্তি হতে প্রারম্ভ করার আগে, চলুন আমাদের হ্যান্ডলারটি ঠিক করি।

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ব্যবহারকারী %d কে স্বাগতম ইমেইল পাঠানো হচ্ছে", p.UserID)
    return nil 
}