Guía de inicio

En este tutorial, escribiremos dos programas, client y workers.

  • client.go creará y programará tareas para ser procesadas de forma asincrónica por hilos de trabajo en segundo plano.
  • workers.go iniciará múltiples hilos de trabajo concurrentes para manejar las tareas creadas por el cliente.

Esta guía asume que estás ejecutando un servidor Redis en localhost:6379. Antes de comenzar, asegúrate de que Redis esté instalado y en funcionamiento.

Comencemos creando nuestros dos archivos principales.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Luego, instala el paquete asynq.

go get -u github.com/hibiken/asynq

Antes de comenzar a escribir el código, revisemos algunos tipos principales que se usarán en estos dos programas.

Opciones de Conexión a Redis

Asynq utiliza Redis como intermediario de mensajes. Tanto client.go como workers.go necesitan conectarse a Redis para operaciones de lectura y escritura. Utilizaremos RedisClientOpt para especificar la conexión al servidor Redis que se está ejecutando localmente.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // La contraseña puede ser omitida si no es necesaria
    Password: "micontraseña",
    // Utilizar un número de base de datos dedicado para asynq.
    // Por defecto, Redis proporciona 16 bases de datos (0 a 15).
    DB: 0,
}

Tareas

En asynq, las unidades de trabajo están encapsuladas en un tipo llamado Task, que conceptualmente tiene dos campos: Type y Payload.

// Type es un valor de string que indica el tipo de la tarea.
func (t *Task) Type() string

// Payload es la datos requeridos para la ejecución de la tarea.
func (t *Task) Payload() []byte

Ahora que hemos revisado los tipos principales, comencemos a escribir nuestros programas.

Programa Cliente

En client.go, crearemos algunas tareas y las encolaremos usando asynq.Client.

Para crear una tarea, puedes utilizar la función NewTask y pasar el tipo y el payload de la tarea.

El método Enqueue toma una tarea y cualquier número de opciones. Utiliza las opciones ProcessIn o ProcessAt para programar tareas para su procesamiento futuro.

// Payload relacionado con las tareas de correo electrónico.
type EmailTaskPayload struct {
    // ID del destinatario del correo electrónico.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Crea una tarea con nombre de tipo y payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Procesa las tareas inmediatamente.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarea encolada correctamente: %+v", info)

    // Procesa las tareas después de 24 horas.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarea encolada correctamente: %+v", info)
}

Eso es todo lo que necesitamos para nuestro programa cliente.

Programa de trabajadores

En workers.go, crearemos una instancia de asynq.Server para iniciar los trabajadores.

La función NewServer toma RedisConnOpt y Config como parámetros.

El Config se utiliza para ajustar el comportamiento del procesamiento de tareas del servidor. Puede consultar la documentación de Config para conocer todas las opciones de configuración disponibles.

Para simplificar, en este ejemplo, solo especificamos la concurrencia.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Nota: En la siguiente sección, presentaremos qué es un `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

El parámetro del método (*Server).Run es una interfaz asynq.Handler, que tiene un método ProcessTask.

type Handler interface {
    // Si la tarea se procesa correctamente, ProcessTask debería devolver nil.
    // Si ProcessTask devuelve un error distinto de nulo o provoca un pánico, la tarea se reintentará más tarde.
    ProcessTask(context.Context, *Task) error
}

La forma más sencilla de implementar un handler es definir una función con la misma firma y usar el tipo adaptador asynq.HandlerFunc al pasarlo a Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Enviando recordatorio por correo al usuario %d", p.UserID)

    default:
        return fmt.Errorf("Tipo de tarea inesperado: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Usar el adaptador asynq.HandlerFunc para manejar la función
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Podemos seguir agregando casos de switch para esta función de handler, pero en una aplicación real, será más conveniente definir la lógica para cada caso en una función separada.

Para refactorizar nuestro código, usemos ServeMux para crear nuestro handler. Al igual que el ServeMux del paquete "net/http", puede registrar un handler llamando a Handle o HandleFunc. ServeMux satisface la interfaz Handler, por lo que se puede pasar a (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando recordatorio por correo al usuario %d", p.UserID)
    return nil
}

Ahora que hemos extraído las funciones de manejo para cada tipo de tarea, el código se ve más organizado. Sin embargo, el código aún es un poco demasiado implícito. Tenemos estos valores de cadena para los tipos de tarea y los tipos de payload, y deberíamos encapsularlos en un paquete orgánico. Refactoricemos nuestro código y escribamos un paquete para encapsular la creación y el manejo de tareas. Simplemente creamos un paquete llamado task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// Lista de tipos de tarea.
const (
    TypeWelcomeEmail  = "email:bienvenida"
    TypeReminderEmail = "email:recordatorio"
)

// Carga útil para cualquier tarea relacionada con correos electrónicos.
type EmailTaskPayload struct {
    // ID del destinatario del correo electrónico.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando correo de recordatorio al usuario %d", p.UserID)
    return nil
}

Ahora podemos importar este paquete en client.go y workers.go.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Encolar la tarea inmediatamente.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarea encolada con éxito: %+v", info)

    // Encolar la tarea para ser procesada después de 24 horas.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarea encolada con éxito: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

¡El código se ve mejor ahora!

Ahora que tenemos el client y los workers listos, podemos ejecutar estos dos programas. Comencemos ejecutando el programa client para crear y programar tareas.

go run client/client.go

Esto creará dos tareas: una para procesamiento inmediato y otra para procesar después de 24 horas.

Usemos la interfaz de línea de comandos asynq para inspeccionar las tareas.

asynq dash

Debería poder ver una tarea en estado Enqueued y otra tarea en estado Scheduled.

Nota: Para comprender el significado de cada estado, consulte la Vida de una Tarea.

Finalmente, iniciemos el programa workers para manejar las tareas.

go run workers/workers.go

Nota: Este programa no se cerrará hasta que envíe una señal para terminarlo. Para conocer las mejores prácticas sobre cómo terminar de manera segura los trabajadores en segundo plano, consulte la página de Señales Wiki.

Debería poder ver texto de salida en la terminal, lo que indica el procesamiento exitoso de las tareas.

Puede ejecutar el programa client nuevamente para ver cómo los trabajadores las aceptan y las procesan.

No es raro que una tarea falle al procesarse correctamente en el primer intento. De forma predeterminada, las tareas fallidas se reintentarán 25 veces con reintentos exponenciales. Actualicemos nuestro controlador para devolver un error y simular una situación fallida.

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Intentando enviar un correo de bienvenida al usuario %d...", p.UserID)
    return fmt.Errorf("Error al enviar el correo al usuario")
}

Volvamos a iniciar nuestro programa de workers y enfilemos una tarea.

go run workers/workers.go
go run client/client.go

Si estás ejecutando asynq dash, deberías poder ver una tarea en estado Reintentar (navegando a la vista de detalles de la cola y resaltando la pestaña "reintentar").

Para comprobar qué tareas están en estado de reintentar, también puedes ejecutar:

asynq task ls --queue=default --state=retry

Esto listar todas las tareas que serán reintentadas en el futuro. La salida incluye la hora esperada de la próxima ejecución para cada tarea.

Una vez que una tarea haya agotado sus intentos de reintentar, pasará al estado de Archivado y no será reintentada de nuevo (aún puedes ejecutar manualmente las tareas archivadas usando las herramientas CLI o WebUI).

Antes de concluir este tutorial, arreglemos nuestro manejador.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando un correo de bienvenida al usuario %d", p.UserID)
    return nil 
}