Руководство по началу работы

В этом руководстве мы напишем две программы, client и workers.

  • client.go будет создавать и планировать задачи для асинхронной обработки фоновыми рабочими потоками.
  • workers.go будет запускать несколько параллельных рабочих потоков для обработки задач, созданных клиентом.

Это руководство предполагает, что у вас запущен сервер Redis по адресу localhost:6379. Прежде чем начать, убедитесь, что Redis установлен и запущен.

Давайте сначала создадим наши два основных файла.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Затем установим пакет asynq.

go get -u github.com/hibiken/asynq

Прежде чем мы начнем писать код, давайте рассмотрим некоторые основные типы, которые будут использоваться в этих двух программах.

Параметры подключения к Redis

Asynq использует Redis в качестве посредника сообщений. Как client.go, так и workers.go должны подключаться к Redis для операций чтения и записи. Мы будем использовать RedisClientOpt для указания подключения к локально работающему серверу Redis.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Пароль можно опустить, если он не нужен
    Password: "mypassword",
    // Используйте отдельный номер базы данных для asynq.
    // По умолчанию Redis предоставляет 16 баз данных (от 0 до 15).
    DB: 0,
}

Задачи

В asynq рабочие блоки инкапсулируются в типе Task, который концептуально имеет два поля: Type и Payload.

// Type - это строковое значение, которое указывает тип задачи.
func (t *Task) Type() string

// Payload - данные, необходимые для выполнения задачи.
func (t *Task) Payload() []byte

Теперь, когда мы рассмотрели основные типы, давайте начнем писать наши программы.

Программа клиента

В client.go мы создадим некоторые задачи и поместим их в очередь с помощью asynq.Client.

Для создания задачи вы можете использовать функцию NewTask и передать тип и полезную нагрузку задачи.

Метод Enqueue принимает задачу и любое количество опций. Используйте опции ProcessIn или ProcessAt для планирования задач на будущее выполнение.

// Полезная нагрузка, связанная с задачами электронной почты.
type EmailTaskPayload struct {
    // Идентификатор получателя письма.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Создание задачи с именем типа и полезной нагрузкой.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Обработка задач немедленно.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)

    // Обработка задач через 24 часа.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
}

Этого достаточно для нашей программы клиента.

Программа работников

В файле workers.go мы создадим экземпляр asynq.Server, чтобы запустить работников.

Функция NewServer принимает в качестве параметров RedisConnOpt и Config.

Config используется для настройки поведения обработки задач сервером. Вы можете обратиться к документации по Config, чтобы узнать о всех доступных опциях конфигурации.

Для простоты, в этом примере мы указываем только параллельность.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Примечание: В следующем разделе мы расскажем, что такое `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Параметр метода (*Server).Run является интерфейсом asynq.Handler, у которого есть метод ProcessTask.

type Handler interface {
    // Если задача обработана успешно, ProcessTask должен возвращать nil.
    // Если ProcessTask возвращает ошибку или вызывает панику, то задача будет повторно обработана позже.
    ProcessTask(context.Context, *Task) error
}

Самый простой способ реализовать обработчик - определить функцию с тем же сигнатурой и использовать адаптер типа asynq.HandlerFunc при передаче ее в Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)

    default:
        return fmt.Errorf("Неожиданный тип задачи: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Используйте адаптер asynq.HandlerFunc для обработки функции
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Можем продолжать добавлять в эту функцию-обработчик другие случаи использования, но в реальном приложении будет удобнее определить логику для каждого случая в отдельной функции.

Для рефакторинга нашего кода воспользуемся ServeMux для создания нашего обработчика. Как и ServeMux из пакета "net/http", вы можете зарегистрировать обработчик, вызвав Handle или HandleFunc. ServeMux удовлетворяет интерфейсу Handler, поэтому его можно передать в (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)
    return nil
}

Теперь, когда мы извлекли функции обработки для каждого типа задачи, код выглядит более организованным. Однако код все еще немного неявен. У нас есть строки для типов задач и типов полезной нагрузки, и их следует инкапсулировать в органический пакет. Давайте преобразуем наш код и напишем пакет для инкапсуляции создания и обработки задач. Просто создадим пакет с именем task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// Список типов задач.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Payload для любой задачи, связанной с электронной почтой.
type EmailTaskPayload struct {
    // Идентификатор получателя электронной почты.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Отправка напоминания пользователю %d", p.UserID)
    return nil
}

Теперь мы можем импортировать этот пакет в client.go и workers.go.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Немедленно помещаем задачу в очередь.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)

    // Помещаем задачу в очередь для обработки через 24 часа.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Задача успешно помещена в очередь: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

Теперь код выглядит более читаемым!

Теперь, когда у нас есть client и workers, мы можем запустить эти две программы. Давайте начнем с запуска программы client, чтобы создать и запланировать задачи.

go run client/client.go

Это создаст две задачи: одну для немедленной обработки и другую для обработки через 24 часа.

Давайте воспользуемся интерфейсом командной строки asynq, чтобы просмотреть задачи.

asynq dash

Вы должны увидеть одну задачу в состоянии Enqueued и другую задачу в состоянии Scheduled.

Примечание: Чтобы понять значение каждого состояния, обратитесь к Жизненному циклу задачи.

Наконец, давайте запустим программу workers для обработки задач.

go run workers/workers.go

Примечание: Эта программа не завершится, пока вы явно не отправите сигнал для завершения. Для лучших практик по безопасному завершению фоновых рабочих процессов обратитесь к странице вики Сигналы.

Вы должны увидеть некоторый текстовый вывод в терминале, указывающий на успешную обработку задач.

Вы можете снова запустить программу client, чтобы увидеть, как рабочие процессы принимают и обрабатывают их.

Не редко задача не удается обработаться успешно с первой попытки. По умолчанию неудачные задачи будут повторно попытаны 25 раз с экспоненциальной задержкой. Давайте обновим наш обработчик, чтобы вернуть ошибку и смоделировать неудачную ситуацию.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Попытка отправить приветственное письмо пользователю %d...", p.UserID)
    return fmt.Errorf("Не удалось отправить письмо пользователю")
}

Давайте перезапустим нашу программу для обработки задач и добавим задачу в очередь.

go run workers/workers.go
go run client/client.go

Если вы используете asynq dash, вы должны увидеть задачу в состоянии Retry (перейдя в вид очереди и выделив вкладку "retry").

Чтобы проверить, какие задачи находятся в состоянии retry, вы также можете выполнить:

asynq task ls --queue=default --state=retry

Это покажет все задачи, которые будут повторно попытаны в будущем. Результат будет включать ожидаемое время следующего выполнения для каждой задачи.

Как только задача исчерпает свои попытки повтора, она перейдет в состояние Archived и больше не будет повторно выполняться (вы по-прежнему можете запускать архивированные задачи вручную с помощью инструментов CLI или WebUI).

Прежде чем завершить этот учебник, давайте исправим наш обработчик.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Отправка приветственного письма пользователю %d", p.UserID)
    return nil 
}