Guía de inicio
En este tutorial, escribiremos dos programas, client
y workers
.
-
client.go
creará y programará tareas para ser procesadas de forma asincrónica por hilos de trabajo en segundo plano. -
workers.go
iniciará múltiples hilos de trabajo concurrentes para manejar las tareas creadas por el cliente.
Esta guía asume que estás ejecutando un servidor Redis en localhost:6379
. Antes de comenzar, asegúrate de que Redis esté instalado y en funcionamiento.
Comencemos creando nuestros dos archivos principales.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Luego, instala el paquete asynq
.
go get -u github.com/hibiken/asynq
Antes de comenzar a escribir el código, revisemos algunos tipos principales que se usarán en estos dos programas.
Opciones de Conexión a Redis
Asynq utiliza Redis como intermediario de mensajes. Tanto client.go
como workers.go
necesitan conectarse a Redis para operaciones de lectura y escritura. Utilizaremos RedisClientOpt
para especificar la conexión al servidor Redis que se está ejecutando localmente.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// La contraseña puede ser omitida si no es necesaria
Password: "micontraseña",
// Utilizar un número de base de datos dedicado para asynq.
// Por defecto, Redis proporciona 16 bases de datos (0 a 15).
DB: 0,
}
Tareas
En asynq
, las unidades de trabajo están encapsuladas en un tipo llamado Task
, que conceptualmente tiene dos campos: Type
y Payload
.
// Type es un valor de string que indica el tipo de la tarea.
func (t *Task) Type() string
// Payload es la datos requeridos para la ejecución de la tarea.
func (t *Task) Payload() []byte
Ahora que hemos revisado los tipos principales, comencemos a escribir nuestros programas.
Programa Cliente
En client.go
, crearemos algunas tareas y las encolaremos usando asynq.Client
.
Para crear una tarea, puedes utilizar la función NewTask
y pasar el tipo y el payload de la tarea.
El método Enqueue
toma una tarea y cualquier número de opciones. Utiliza las opciones ProcessIn
o ProcessAt
para programar tareas para su procesamiento futuro.
// Payload relacionado con las tareas de correo electrónico.
type EmailTaskPayload struct {
// ID del destinatario del correo electrónico.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Crea una tarea con nombre de tipo y payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Procesa las tareas inmediatamente.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarea encolada correctamente: %+v", info)
// Procesa las tareas después de 24 horas.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarea encolada correctamente: %+v", info)
}
Eso es todo lo que necesitamos para nuestro programa cliente.
Programa de trabajadores
En workers.go
, crearemos una instancia de asynq.Server
para iniciar los trabajadores.
La función NewServer
toma RedisConnOpt
y Config
como parámetros.
El Config
se utiliza para ajustar el comportamiento del procesamiento de tareas del servidor.
Puede consultar la documentación de Config
para conocer todas las opciones de configuración disponibles.
Para simplificar, en este ejemplo, solo especificamos la concurrencia.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Nota: En la siguiente sección, presentaremos qué es un `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
El parámetro del método (*Server).Run
es una interfaz asynq.Handler
, que tiene un método ProcessTask
.
type Handler interface {
// Si la tarea se procesa correctamente, ProcessTask debería devolver nil.
// Si ProcessTask devuelve un error distinto de nulo o provoca un pánico, la tarea se reintentará más tarde.
ProcessTask(context.Context, *Task) error
}
La forma más sencilla de implementar un handler es definir una función con la misma firma y usar el tipo adaptador asynq.HandlerFunc
al pasarlo a Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando recordatorio por correo al usuario %d", p.UserID)
default:
return fmt.Errorf("Tipo de tarea inesperado: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Usar el adaptador asynq.HandlerFunc para manejar la función
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Podemos seguir agregando casos de switch para esta función de handler, pero en una aplicación real, será más conveniente definir la lógica para cada caso en una función separada.
Para refactorizar nuestro código, usemos ServeMux
para crear nuestro handler. Al igual que el ServeMux
del paquete "net/http"
, puede registrar un handler llamando a Handle
o HandleFunc
. ServeMux
satisface la interfaz Handler
, por lo que se puede pasar a (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando recordatorio por correo al usuario %d", p.UserID)
return nil
}
Ahora que hemos extraído las funciones de manejo para cada tipo de tarea, el código se ve más organizado. Sin embargo, el código aún es un poco demasiado implícito. Tenemos estos valores de cadena para los tipos de tarea y los tipos de payload, y deberíamos encapsularlos en un paquete orgánico. Refactoricemos nuestro código y escribamos un paquete para encapsular la creación y el manejo de tareas. Simplemente creamos un paquete llamado task
.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// Lista de tipos de tarea.
const (
TypeWelcomeEmail = "email:bienvenida"
TypeReminderEmail = "email:recordatorio"
)
// Carga útil para cualquier tarea relacionada con correos electrónicos.
type EmailTaskPayload struct {
// ID del destinatario del correo electrónico.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando correo de bienvenida al usuario %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando correo de recordatorio al usuario %d", p.UserID)
return nil
}
Ahora podemos importar este paquete en client.go
y workers.go
.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Encolar la tarea inmediatamente.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarea encolada con éxito: %+v", info)
// Encolar la tarea para ser procesada después de 24 horas.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarea encolada con éxito: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
¡El código se ve mejor ahora!
Ahora que tenemos el client
y los workers
listos, podemos ejecutar estos dos programas. Comencemos ejecutando el programa client
para crear y programar tareas.
go run client/client.go
Esto creará dos tareas: una para procesamiento inmediato y otra para procesar después de 24 horas.
Usemos la interfaz de línea de comandos asynq
para inspeccionar las tareas.
asynq dash
Debería poder ver una tarea en estado Enqueued y otra tarea en estado Scheduled.
Nota: Para comprender el significado de cada estado, consulte la Vida de una Tarea.
Finalmente, iniciemos el programa workers
para manejar las tareas.
go run workers/workers.go
Nota: Este programa no se cerrará hasta que envíe una señal para terminarlo. Para conocer las mejores prácticas sobre cómo terminar de manera segura los trabajadores en segundo plano, consulte la página de Señales Wiki.
Debería poder ver texto de salida en la terminal, lo que indica el procesamiento exitoso de las tareas.
Puede ejecutar el programa client
nuevamente para ver cómo los trabajadores las aceptan y las procesan.
No es raro que una tarea falle al procesarse correctamente en el primer intento. De forma predeterminada, las tareas fallidas se reintentarán 25 veces con reintentos exponenciales. Actualicemos nuestro controlador para devolver un error y simular una situación fallida.
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Intentando enviar un correo de bienvenida al usuario %d...", p.UserID)
return fmt.Errorf("Error al enviar el correo al usuario")
}
Volvamos a iniciar nuestro programa de workers y enfilemos una tarea.
go run workers/workers.go
go run client/client.go
Si estás ejecutando asynq dash
, deberías poder ver una tarea en estado Reintentar (navegando a la vista de detalles de la cola y resaltando la pestaña "reintentar").
Para comprobar qué tareas están en estado de reintentar, también puedes ejecutar:
asynq task ls --queue=default --state=retry
Esto listar todas las tareas que serán reintentadas en el futuro. La salida incluye la hora esperada de la próxima ejecución para cada tarea.
Una vez que una tarea haya agotado sus intentos de reintentar, pasará al estado de Archivado y no será reintentada de nuevo (aún puedes ejecutar manualmente las tareas archivadas usando las herramientas CLI o WebUI).
Antes de concluir este tutorial, arreglemos nuestro manejador.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando un correo de bienvenida al usuario %d", p.UserID)
return nil
}