শুরু করার নির্দেশিকা
এই টিউটোরিয়ালে, আমরা দুটি প্রোগ্রাম, client
এবং workers
লিখব।
-
client.go
টি ব্যাকগ্রাউন্ড ওয়ার্কার থ্রেড দ্বারা প্রক্রিয়া করা হতে ইচ্ছুক টাস্কগুলি তৈরি এবং অনুসূচিত করবে। -
workers.go
টি ক্লায়েন্ট দ্বারা তৈরি করা টাস্কগুলি হ্যান্ডেল করতে বহুপক্ষীয় ওয়ার্কার থ্রেড চালু করবে।
এই গাইড অনুমান করে, আপনি localhost:6379
এ রান করা Redis সার্ভার ব্যবহার করছেন। শুরু করার আগে, দয়া করে নিশ্চিত করুন যে Redis ইনস্টল এবং চালু আছে।
আসুন প্রথমে আমাদের দুটি প্রধান ফাইল তৈরি করি।
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
তারপর, asynq
প্যাকেজটি ইনস্টল করুন।
go get -u github.com/hibiken/asynq
আমরা কোড লিখতে শুরু করার আগে, চলুন কিছু কোর টাইপ যারা এই দুটি প্রোগ্রামে ব্যবহৃত হবে তাদের পর্যালোচনা করি।
রেডিস সংযোগ বিকল্প
Asynq রেডিসকে একটি মেসেজ ব্রোকার হিসাবে ব্যবহার করে। client.go
এবং workers.go
উভয়ই রেডিসে পড়া এবং লিখা করার জন্য সংযোগ করতে হবে। আমরা স্থানীয়ভাবে চলার রেডিস সার্ভারের সংযোগ নির্দিষ্ট করতে RedisClientOpt
ব্যবহার করব।
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// প্রয়োজন না হলে, পাসওয়ার্ড অপসারিত করা যেতে পারে
Password: "mypassword",
// এসাইনকিউর জন্য একটি বিশেষ ডাটাবেস নম্বর ব্যবহার করুন।
// ডিফল্টভাবে, রেডিস ১৬ ডাটাবেস সরবরাহ করে (0 থেকে 15)
DB: 0,
}
কার্য
asynq
তে, কাজের ইউনিটগুলি একটি টাইপ নামক টাইপে এনক্যাপসুলেট করা হয়, যা ধারাবাহিকভাবে দুটি ফিল্ড: Type
এবং Payload
আছে।
// ধরনটি হলো একটি স্ট্রিং মান যা কাজের প্রকারটি নির্দেশক।
func (t *Task) Type() string
// পেলোডটি কাজ সম্পাদনের জন্য প্রয়োজনীয় তথ্য।
func (t *Task) Payload() []byte
এখন যখন আমরা মূল টাইপগুলির পর্যালোচনা করেছি, আসুন আমাদের প্রোগ্রাম লিখা শুরু করি।
ক্লায়েন্ট প্রোগ্রাম
client.go
তে, আমরা কিছু টাস্ক তৈরি এবং এনকিউ করব।
টাস্ক তৈরি করতে, আপনি NewTask
ফাংশন ব্যবহার করতে পারেন এবং টাস্কের ধরন এবং পেলোড পাস করতে পারেন।
Enqueue
মেথডটি একটি টাস্ক এবং যেকোনো সংখ্যক অপশন নিয়ে নেয়। ভবিষ্যতে প্রসেসিং এর জন্য টাস্কগুলি অনুসূচিত করার জন্য একটি অপশন হিসেবে ProcessIn
বা ProcessAt
ব্যবহার করুন।
// ইমেল টাস্কের সম্পর্কিত পেলোড।
type EmailTaskPayload struct {
// ইমেল প্রাপকের আইডি।
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// টাস্ক ধরন এবং পেলোড দিয়ে একটি টাস্ক তৈরি করুন।
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// টাস্ক দ্রুত প্রসেস করুন।
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] সফলভাবে টাস্ক ইনকিউ করা হয়েছে: %+v", info)
// ২৪ ঘন্টা পরে টাস্ক প্রসেস করুন।
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] সফলভাবে টাস্ক ইনকিউ করা হয়েছে: %+v", info)
}
আমাদের ক্লায়েন্ট প্রোগ্রামের জন্য আমরা এই পর্যালোচনাটি পর্যাপ্ত আছে।
ওয়ার্কার্স প্রোগ্রাম
workers.go
তে, আমরা ওয়ার্কার্স চালু করতে একটি asynq.Server
উদাহরণ তৈরি করব।
NewServer
ফাংশনটি RedisConnOpt
এবং Config
কে প্যারামিটার হিসেবে নেয়।
Config
টা পদক্ষেপ গ্রহণকারী সার্ভারের কাজের প্রতিষ্ঠান সাজানোর জন্য ব্যবহৃত হয়।
আপনি সমস্ত উপলব্ধ কনফিগারেশন অপশন সম্পর্কে শিখতে পারেন Config
দ্য ডকুমেন্টেশন থেকে।
সুবিধার্থে, এই উদাহারণে, আমরা কেবলমাত্ সংক্রান্তভাবে ব্যবহার করবো।
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// লক্ষ্যচিহ্ন: পরবর্তী বিভাগে, আমরা কি বোধ় দিচ্ছি `handler`?
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
এর প্যারামিটার হল ইন্টারফেস asynq.Handler
, যা একটি মেথড ProcessTask
হয়।
type Handler interface {
// যদি টাস্কটি সাফল্যের সাথে প্রক্রিয়াজাত হয়, তবে ProcessTask নিল রিটার্ন করতে হবে।
// যদি ProcessTask নিলো ইরর রিটার্ন করে বা একটি প্যানিক উত্পন্ন করে, তাহলে টাস্কটি পরে পুনরায় প্রয়াস করা হবে।
ProcessTask(context.Context, *Task) error
}
হ্যান্ডলার ব্যবহার করা একটি সর্বসাধারণ পদ্ধতি হল, একটি বিভিন্ন বৈশিষ্ট্য সমগ্র লিখা এবং এটি ভরণো করা যোকা function.
asynq.HandlerFunc
অ্যাডাপ্টার টাইপ ব্যবহৃত করা হয়া যেতে।
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d এর ওয়েলকাম ইমেল পাঠানো হচ্ছে", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d কে রিমাইন্ডার ইমেল পাঠানো হচ্ছে", p.UserID)
default:
return fmt.Errorf("অপ্রত্যাশিত টাস্ক প্রকার: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// ব্যবহার কারজের জন্য `asynq.HandlerFunc` অ্যাডাপ্টার ব্যবহার করুন
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
আমরা এই হ্যান্ডালার ফাংশনের জন্য সুইচ কেস যোগ করতে চাই, কিন্তু একটি আসল অ্যাপ্লিকেশনে, প্রতিটি মামলার জন্য মোটা ভাবে যোগ করা শুধু আরও সুবিধাজনক হবে। আমরা আমাদের কোড পুনর্বিচার করার জন্য, আসুন ServeMux
ব্যবহার করে আমাদের হ্যান্ডলারগুলি তৈরি করি। "net/http"
প্যাকেজে থেকে ServeMux
এর মতো ।, আপনি Handle
বা HandleFunc
কল করে হ্যান্ডলার নিবন্ধন করতে পারেন। ServeMux
একটি Handler
ইন্টারফেস পূর্ণ করে, তাই এটি পাস করা যাবে (*Server).Run
এর জন্য।
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d এর ওয়েলকাম ইমেল পাঠানো হচ্ছে", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d কে রিমাইন্ডার ইমেল পাঠানো হচ্ছে", p.UserID)
return nil
}
এখন যোক্তো আমরা প্রতিটি প্রকারের টাস্কের জন্য নিয়ন্ত্রণ করার জন্য হ্যান্ডলিং ফাংশনগুলি পুনর্বিচার করতে পারি, কিন্তু আসল অ্যাপ্লিকেশনে, আমাদের মোটাভাবে সাবধান থাকতে হবে। আমরা এই স্ট্রিং মানগুলির জন্য এবং লোড ক্ষমতা মানগুলি ইউনিকেপ করা উচিত, আমরা আমাদের কোড পুনর্গঠন করব এবং টাস্ক নির্মাণ এবং হ্যান্ডলিং থেকে বিচ্ছিন্ন একটি প্যাকেজ লিখব। আমরা শুধুমাত্র একটা প্যাকেজ তৈরি করুন এবং এটি যাাত বলো হয় task
.
ম্যানুয়ালি ভেস্টাউন্ট ট্রান্সমিশনষয়নের
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// টাস্কের ধরণের তালিকা।
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// ইমেইল সংক্রান্ত যেকোন টাস্কের জন্য পেলোড।
type EmailTaskPayload struct {
// ইমেইল প্রাপকের আইডি।
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d-কে স্বাগত ইমেইল পাঠানো হচ্ছে", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d-কে অনুস্মারক ইমেইল পাঠানো হচ্ছে", p.UserID)
return nil
}
এখন client.go
এ এই প্যাকেজটি ইম্পোর্ট করা যাবে client.go
এ এবং workers.go
এ।
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// টাস্কগুলি তাৎপরতায় ইনকিউ করুন।
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] টাস্ক সফলভাবে ইনকিউ করা হয়েছে: %+v", info)
// ২৪ ঘণ্টার পরে প্রক্রিয়াজাত করার জন্য টাস্ক ইনকিউ করুন।
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] টাস্ক সফলভাবে ইনকিউ করা হয়েছে: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
কোডটি এখন ঠিকঠাক দেখতে চলেছে!
client
ও workers
প্রস্তুত হলে, আমরা এই দুটি প্রোগ্রামগুলি চালাতে পারি। যতটুকু কাজ করার জন্য client
প্রোগ্রামটি চালানোর আগে আমরা শুরু করি।
go run client/client.go
এটি দুটি কাজ তৈরি করবে: একটি তাৎপর্যে প্রসেসিং এবং আরেকটি ২৪ ঘণ্টার পরে প্রসেসিং করার জন্য।
আসুন asynq
কমান্ড লাইন ইন্টারফেস ব্যবহার করে টাস্কগুলি পরীক্ষা করা যাক।
asynq dash
আপনি দেখতে পারবেন যে Enqueued অবস্থায় একটি টাস্ক এবং Scheduled অবস্থায় আরেকটি টাস্ক আছে।
নোট: প্রতিটি অবস্থার অর্থ বোঝার জন্য, দয়া করে টাস্ক জীবনচক্র দেখুন।
শেষবার, টাস্কগুলি প্রসেস করার জন্য workers
প্রোগ্রামটি চালু করা যাক।
go run workers/workers.go
নোট: এই প্রোগ্রামটি প্রস্থান করা না পর্যন্ত একটি সিগন্যাল পাঠানো না করা পর্যন্ত বন্ধ হবে না। পটভংকামূলক প্রচারণার সম্পর্কে সেরা অনুশীলনগুলি জানতে, দয়া করে সিগন্যাল উইকি পেজ দেখুন।
আপনি দেখতে পারবেন যে টার্মিনালে কিছু টেক্সট আউটপুট দেখা যাচ্ছে, যাতে টাস্কগুলি সফলভাবে প্রসেস করা হয়েছে।
আপনি আবার client
প্রোগ্রামটি চালিয়ে দেখতে পারবেন যেভাবে ওইগুলি টাস্ক গ্রহণ করে এবং তাদের প্রসেস করে।
প্রথম প্রচেষ্টায় একটি টাস্ক সফলভাবে প্রসেস করা হতে না পারা টাস্ক কোনও অসফল অবস্থায় ফেলে যাওয়া খুব অসম্ভাব। ডিফল্টভাবে, ব্যাকঅফমগুলি সহ পুনৰাবৃত্তি হবে দ্বিগুণ প্রচুর। আমাদের হ্যান্ডলারটি আপডেট করতে একটি ত্রুটি রিটার্ন করার মতো ভুল শিটুয়েশনটি সহজভাবে প্রতিপ্রারিত করি।
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d কে স্বাগতম ইমেইল পাঠানোর চেষ্টা করা হচ্ছে...", p.UserID)
return fmt.Errorf("ব্যবহারকারীকে ইমেইল পাঠানো ব্যর্থ হয়েছে")
}
আসুন আমাদের ওয়ার্কারস প্রোগ্রাম পুনরায় আরম্ভ করি এবং একটি টাস্ক ইনকিউ করি।
go run workers/workers.go
go run client/client.go
যদি আপনি asynq ড্যাশ
চালু আছেন, তবে আপনি দেখতে পারবেন একটি টাস্ক Retry অবস্থায় (কিউ বিস্তার দেখার জন্য নির্দেশনা বিচার অংশটি উদ্ধার করে "রিট্রাই" ট্যাবে হাইলাইট করার মাধ্যমে)।
রিট্রাই অবস্থায় কোন টাস্ক কোনও চেক করতে, আপনি নিম্নলিখিত কমান্ড চালাতে পারেন:
asynq task ls --queue=default --state=retry
এটি ভবিষ্যতে রিট্রাই করা হবে সমস্ত টাস্কগুলি তালিকা করবে। আউটপুট তার প্রতি টাস্কের প্রত্যাশিত পরবর্তী সময় সহ থাকবে।
একবার একটি টাস্ক আপনার রিট্রাই প্রয়াস শেষ হয়ে যায়, তা অবস্থান প্রয়ানকর পরিবর্তন করবে এবং আরো রিট্রাই করা হবে না (আপনি এখনও CLI বা WebUI সরঞ্জাম ব্যবহার করে অভ্যন্তরীণভাবে সংরক্ষিত করা টাস্কগুলি পরিচালনা করতে পারেন)।
এই টিউটোরিয়াল সমাপ্তি হতে প্রারম্ভ করার আগে, চলুন আমাদের হ্যান্ডলারটি ঠিক করি।
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ব্যবহারকারী %d কে স্বাগতম ইমেইল পাঠানো হচ্ছে", p.UserID)
return nil
}