Hướng dẫn Bắt đầu

Trong hướng dẫn này, chúng ta sẽ viết hai chương trình, clientworkers.

  • client.go sẽ tạo và lên lịch các nhiệm vụ để xử lý một cách bất đồng bộ bởi các luồng worker nền.
  • workers.go sẽ bắt đầu nhiều luồng worker đồng thời để xử lý các nhiệm vụ được tạo bởi client.

Hướng dẫn này giả định rằng bạn đang chạy máy chủ Redis trên localhost:6379. Trước khi bắt đầu, vui lòng đảm bảo rằng Redis đã được cài đặt và đang chạy.

Hãy trước tiên tạo hai tệp chính của chúng ta.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Sau đó, cài đặt gói asynq.

go get -u github.com/hibiken/asynq

Trước khi chúng ta bắt đầu viết code, hãy xem xét một số loại cốt lõi mà chúng ta sẽ sử dụng trong hai chương trình này.

Tùy chọn Kết nối Redis

Asynq sử dụng Redis như một trình trung gian tin nhắn. Cả client.goworkers.go đều cần kết nối với Redis để thực hiện các hoạt động đọc và ghi. Chúng ta sẽ sử dụng RedisClientOpt để chỉ định kết nối đến máy chủ Redis đang chạy cục bộ.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Mật khẩu có thể bị bỏ qua nếu không cần thiết
    Password: "mypassword",
    // Sử dụng một database số đặc biệt cho asynq.
    // Theo mặc định, Redis cung cấp 16 cơ sở dữ liệu (0 đến 15).
    DB: 0,
}

Nhiệm vụ

Trong asynq, các đơn vị công việc được đóng gói trong một loại gọi là Task, mà theo quan niệm có hai trường: TypePayload.

// Type là một giá trị chuỗi chỉ định loại của nhiệm vụ.
func (t *Task) Type() string

// Payload là dữ liệu cần thiết để thực hiện nhiệm vụ.
func (t *Task) Payload() []byte

Bây giờ khi chúng ta đã xem xét các loại cốt lõi, hãy bắt đầu viết chương trình của chúng ta.

Chương trình Client

Trong client.go, chúng ta sẽ tạo một số nhiệm vụ và đưa chúng vào hàng đợi sử dụng asynq.Client.

Để tạo một nhiệm vụ, bạn có thể sử dụng hàm NewTask và truyền loại và payload của nhiệm vụ.

Phương thức Enqueue có thể nhận một nhiệm vụ và bất kỳ số lượng tùy chọn nào. Sử dụng tùy chọn ProcessIn hoặc ProcessAt để lên lịch các nhiệm vụ cho các lần xử lý trong tương lai.

// Payload liên quan đến các nhiệm vụ email.
type EmailTaskPayload struct {
    // ID của người nhận email.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Tạo một nhiệm vụ với tên loại và payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Xử lý các nhiệm vụ ngay lập tức.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Đưa thành công nhiệm vụ vào hàng đợi: %+v", info)

    // Xử lý các nhiệm vụ sau 24 giờ.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Đưa thành công nhiệm vụ vào hàng đợi: %+v", info)
}

Đây là tất cả những gì chúng ta cần cho chương trình client của chúng ta.

Chương trình Workers

Trong workers.go, chúng ta sẽ tạo một thể hiện của asynq.Server để khởi động các workers.

Hàm NewServer nhận vào tham số RedisConnOptConfig.

Config được sử dụng để điều chỉnh hành vi xử lý công việc của server. Bạn có thể tham khảo tài liệu Config để tìm hiểu về tất cả các tùy chọn cấu hình có sẵn.

Đơn giản ở ví dụ này, chúng tôi chỉ xác định sự đồng thời.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Lưu ý: Trong phần tiếp theo, chúng tôi sẽ giới thiệu về `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Tham số của phương thức (*Server).Run là một giao diện asynq.Handler, có một phương thức ProcessTask.

type Handler interface {
    // Nếu công việc được xử lý thành công, ProcessTask nên trả về nil.
    // Nếu ProcessTask trả về một lỗi không phải nil hoặc gây ra một sự cố, công việc sẽ được thử lại sau này.
    ProcessTask(context.Context, *Task) error
}

Cách đơn giản nhất để triển khai một handler là định nghĩa một hàm có cùng chữ ký và sử dụng kiểu adapter asynq.HandlerFunc khi truyền nó vào Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Gửi email chào mừng đến người dùng %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Gửi email nhắc nhở đến người dùng %d", p.UserID)

    default:
        return fmt.Errorf("Loại công việc không mong đợi: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Sử dụng kiểu adapter asynq.HandlerFunc để xử lý hàm
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Chúng ta có thể tiếp tục thêm các trường hợp switch cho hàm xử lý này, nhưng trong một ứng dụng thực tế, sẽ thuận tiện hơn để định nghĩa logic cho mỗi trường hợp trong một hàm riêng biệt.

Để tái cấu trúc mã của chúng ta, hãy sử dụng ServeMux để tạo giải pháp xử lý của chúng ta. Giống như ServeMux từ gói "net/http", bạn có thể đăng ký một handler bằng cách gọi Handle hoặc HandleFunc. ServeMux đáp ứng giao diện Handler, vì vậy nó có thể được truyền vào (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Gửi email chào mừng đến người dùng %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Gửi email nhắc nhở đến người dùng %d", p.UserID)
    return nil
}

Bây giờ, khi chúng ta đã trích xuất các hàm xử lý cho mỗi loại công việc, mã trông có tổ chức hơn. Tuy nhiên, mã vẫn hơi ngụ ý. Chúng ta có các giá trị chuỗi cho loại công việc và loại dữ liệu payload, và chúng ta nên đóng gói chúng trong một gói hữu cơ. Hãy tái cấu trúc mã của chúng ta và viết một gói để đóng gói việc tạo và xử lý công việc. Chúng ta chỉ cần tạo một gói có tên là task.

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// Danh sách loại công việc.
const (
	TypeWelcomeEmail  = "email:welcome"
	TypeReminderEmail = "email:reminder"
)

// Dữ liệu đầu vào cho bất kỳ công việc liên quan đến email nào.
type EmailTaskPayload struct {
	// ID của người nhận email.
	UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Gửi email chào mừng tới người dùng %d", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Gửi email nhắc nhở tới người dùng %d", p.UserID)
	return nil
}

Bây giờ chúng ta có thể import gói này trong client.goworkers.go.

```go
// client.go
func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := task.NewWelcomeEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := task.NewReminderEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	// Đưa công việc vào hàng đợi ngay lập tức.
	info, err := client.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Công việc được đưa vào hàng đợi thành công: %+v", info)

	// Đưa công việc vào hàng đợi để xử lý sau 24 giờ.
	info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Công việc được đưa vào hàng đợi thành công: %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
	mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

Code trông tốt hơn rồi đó!

Bây giờ chúng ta đã có clientworkers sẵn sàng, chúng ta có thể chạy hai chương trình này. Hãy bắt đầu bằng cách chạy chương trình client để tạo và lên lịch các công việc.

go run client/client.go

Điều này sẽ tạo ra hai công việc: một cho việc xử lý ngay lập tức và một cái khác cho việc xử lý sau 24 giờ.

Hãy sử dụng giao diện dòng lệnh asynq để kiểm tra các công việc.

asynq dash

Bạn sẽ thấy một công việc ở trạng thái Đưa vào hàng đợi và một công việc ở trạng thái Đã lên lịch.

Chú ý: Để hiểu ý nghĩa của mỗi trạng thái, vui lòng tham khảo Cẩm nang Vòng đời công việc.

Cuối cùng, hãy bắt đầu chạy chương trình workers để xử lý các công việc.

go run workers/workers.go

Chú ý: Chương trình này sẽ không thoát cho đến khi bạn gửi một tín hiệu để kết thúc nó. Để biết các nguyên tắc tốt nhất về cách chấm dứt an toàn cho các tiến trình chạy ngầm, vui lòng tham khảo trang Signals Wiki.

Bạn sẽ thấy một số đầu ra văn bản trên cửa sổ terminal, cho biết việc xử lý các công việc đã được thực hiện thành công.

Bạn có thể chạy chương trình client một lần nữa để xem cách các workers chấp nhận và xử lý chúng.

Không phải là điều hiếm gặp khi một công việc không thành công khi xử lý lần đầu tiên. Theo mặc định, các công việc không thành công sẽ được thử lại 25 lần với sự trì hoãn theo quy luật mũ. Hãy cập nhật handler của chúng ta để trả về một lỗi để mô phỏng một tình huống không thành công.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Đang cố gắng gửi email chào mừng đến người dùng %d...", p.UserID)
    return fmt.Errorf("Gửi email đến người dùng thất bại")
}

Hãy khởi động lại chương trình workers và đưa một task vào hàng đợi.

go run workers/workers.go
go run client/client.go

Nếu bạn đang chạy asynq dash, bạn nên thấy một task ở trạng thái Retry (bằng cách điều hướng đến xem chi tiết hàng đợi và làm nổi bật tab "retry").

Để kiểm tra xem bản ghi nào đang ở trạng thái retry, bạn cũng có thể chạy:

asynq task ls --queue=default --state=retry

Điều này sẽ liệt kê tất cả các tasks sẽ được thử lại vào tương lai. Kết quả sẽ bao gồm thời gian dự kiến cho lần thực thi tiếp theo của mỗi task.

Khi một task đã cạn kiệt tất cả số lần thử, nó sẽ chuyển sang trạng thái Archived và sẽ không được thử lại nữa (bạn vẫn có thể chạy tasks đã được lưu trữ bằng cách sử dụng công cụ CLI hoặc WebUI).

Trước khi kết thúc hướng dẫn này, hãy sửa lại handler của chúng ta.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Đang gửi email chào mừng đến người dùng %d", p.UserID)
    return nil 
}