Руководство по началу работы
В этом руководстве мы напишем две программы, client
и workers
.
-
client.go
будет создавать и планировать задачи для асинхронной обработки фоновыми рабочими потоками. -
workers.go
будет запускать несколько параллельных рабочих потоков для обработки задач, созданных клиентом.
Это руководство предполагает, что у вас запущен сервер Redis по адресу localhost:6379
. Прежде чем начать, убедитесь, что Redis установлен и запущен.
Давайте сначала создадим наши два основных файла.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Затем установим пакет asynq
.
go get -u github.com/hibiken/asynq
Прежде чем мы начнем писать код, давайте рассмотрим некоторые основные типы, которые будут использоваться в этих двух программах.
Параметры подключения к Redis
Asynq использует Redis в качестве посредника сообщений. Как client.go
, так и workers.go
должны подключаться к Redis для операций чтения и записи. Мы будем использовать RedisClientOpt
для указания подключения к локально работающему серверу Redis.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Пароль можно опустить, если он не нужен
Password: "mypassword",
// Используйте отдельный номер базы данных для asynq.
// По умолчанию Redis предоставляет 16 баз данных (от 0 до 15).
DB: 0,
}
Задачи
В asynq
рабочие блоки инкапсулируются в типе Task
, который концептуально имеет два поля: Type
и Payload
.
// Type - это строковое значение, которое указывает тип задачи.
func (t *Task) Type() string
// Payload - данные, необходимые для выполнения задачи.
func (t *Task) Payload() []byte
Теперь, когда мы рассмотрели основные типы, давайте начнем писать наши программы.
Программа клиента
В client.go
мы создадим некоторые задачи и поместим их в очередь с помощью asynq.Client
.
Для создания задачи вы можете использовать функцию NewTask
и передать тип и полезную нагрузку задачи.
Метод Enqueue
принимает задачу и любое количество опций. Используйте опции ProcessIn
или ProcessAt
для планирования задач на будущее выполнение.
// Полезная нагрузка, связанная с задачами электронной почты.
type EmailTaskPayload struct {
// Идентификатор получателя письма.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Создание задачи с именем типа и полезной нагрузкой.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Обработка задач немедленно.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
// Обработка задач через 24 часа.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
}
Этого достаточно для нашей программы клиента.
Программа работников
В файле workers.go
мы создадим экземпляр asynq.Server
, чтобы запустить работников.
Функция NewServer
принимает в качестве параметров RedisConnOpt
и Config
.
Config
используется для настройки поведения обработки задач сервером.
Вы можете обратиться к документации по Config
, чтобы узнать о всех доступных опциях конфигурации.
Для простоты, в этом примере мы указываем только параллельность.
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Примечание: В следующем разделе мы расскажем, что такое `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
Параметр метода (*Server).Run
является интерфейсом asynq.Handler
, у которого есть метод ProcessTask
.
type Handler interface {
// Если задача обработана успешно, ProcessTask должен возвращать nil.
// Если ProcessTask возвращает ошибку или вызывает панику, то задача будет повторно обработана позже.
ProcessTask(context.Context, *Task) error
}
Самый простой способ реализовать обработчик - определить функцию с тем же сигнатурой и использовать адаптер типа asynq.HandlerFunc
при передаче ее в Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)
default:
return fmt.Errorf("Неожиданный тип задачи: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Используйте адаптер asynq.HandlerFunc для обработки функции
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Можем продолжать добавлять в эту функцию-обработчик другие случаи использования, но в реальном приложении будет удобнее определить логику для каждого случая в отдельной функции.
Для рефакторинга нашего кода воспользуемся ServeMux
для создания нашего обработчика. Как и ServeMux
из пакета "net/http"
, вы можете зарегистрировать обработчик, вызвав Handle
или HandleFunc
. ServeMux
удовлетворяет интерфейсу Handler
, поэтому его можно передать в (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)
return nil
}
Теперь, когда мы извлекли функции обработки для каждого типа задачи, код выглядит более организованным. Однако код все еще немного неявен. У нас есть строки для типов задач и типов полезной нагрузки, и их следует инкапсулировать в органический пакет. Давайте преобразуем наш код и напишем пакет для инкапсуляции создания и обработки задач. Просто создадим пакет с именем task
.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// Список типов задач.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Payload для любой задачи, связанной с электронной почтой.
type EmailTaskPayload struct {
// Идентификатор получателя электронной почты.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)
return nil
}
Теперь мы можем импортировать этот пакет в client.go
и workers.go
.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Немедленно помещаем задачу в очередь.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
// Помещаем задачу в очередь для обработки через 24 часа.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
Теперь код выглядит более читаемым!
Теперь, когда у нас есть client
и workers
, мы можем запустить эти две программы. Давайте начнем с запуска программы client
, чтобы создать и запланировать задачи.
go run client/client.go
Это создаст две задачи: одну для немедленной обработки и другую для обработки через 24 часа.
Давайте воспользуемся интерфейсом командной строки asynq
, чтобы просмотреть задачи.
asynq dash
Вы должны увидеть одну задачу в состоянии Enqueued и другую задачу в состоянии Scheduled.
Примечание: Чтобы понять значение каждого состояния, обратитесь к Жизненному циклу задачи.
Наконец, давайте запустим программу workers
для обработки задач.
go run workers/workers.go
Примечание: Эта программа не завершится, пока вы явно не отправите сигнал для завершения. Для лучших практик по безопасному завершению фоновых рабочих процессов обратитесь к странице вики Сигналы.
Вы должны увидеть некоторый текстовый вывод в терминале, указывающий на успешную обработку задач.
Вы можете снова запустить программу client
, чтобы увидеть, как рабочие процессы принимают и обрабатывают их.
Не редко задача не удается обработаться успешно с первой попытки. По умолчанию неудачные задачи будут повторно попытаны 25 раз с экспоненциальной задержкой. Давайте обновим наш обработчик, чтобы вернуть ошибку и смоделировать неудачную ситуацию.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Попытка отправить приветственное письмо пользователю %d...", p.UserID)
return fmt.Errorf("Не удалось отправить письмо пользователю")
}
Давайте перезапустим нашу программу для обработки задач и добавим задачу в очередь.
go run workers/workers.go
go run client/client.go
Если вы используете asynq dash
, вы должны увидеть задачу в состоянии Retry (перейдя в вид очереди и выделив вкладку "retry").
Чтобы проверить, какие задачи находятся в состоянии retry, вы также можете выполнить:
asynq task ls --queue=default --state=retry
Это покажет все задачи, которые будут повторно попытаны в будущем. Результат будет включать ожидаемое время следующего выполнения для каждой задачи.
Как только задача исчерпает свои попытки повтора, она перейдет в состояние Archived и больше не будет повторно выполняться (вы по-прежнему можете запускать архивированные задачи вручную с помощью инструментов CLI или WebUI).
Прежде чем завершить этот учебник, давайте исправим наш обработчик.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
return nil
}