Guia de Início
Neste tutorial, vamos escrever dois programas, client
e workers
.
-
client.go
irá criar e agendar tarefas para serem processadas de forma assíncrona por threads de trabalho em segundo plano. -
workers.go
irá iniciar várias threads de trabalho concorrentes para lidar com tarefas criadas pelo cliente.
Este guia parte do pressuposto de que você está executando um servidor Redis em localhost:6379
. Antes de começar, certifique-se de que o Redis está instalado e em execução.
Vamos primeiro criar nossos dois arquivos principais.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Em seguida, instale o pacote asynq
.
go get -u github.com/hibiken/asynq
Antes de começarmos a escrever o código, vamos revisar alguns tipos principais que serão usados nestes dois programas.
Opções de Conexão com o Redis
Asynq usa o Redis como um corretor de mensagens. Tanto client.go
quanto workers.go
precisam se conectar ao Redis para operações de leitura e escrita. Usaremos RedisClientOpt
para especificar a conexão com o servidor Redis em execução local.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// A senha pode ser omitida se não for necessária
Password: "minhasenha",
// Use um número de banco de dados dedicado para asynq.
// Por padrão, o Redis fornece 16 bancos de dados (0 a 15).
DB: 0,
}
Tarefas
No asynq
, unidades de trabalho são encapsuladas em um tipo chamado Task
, que conceitualmente possui dois campos: Type
e Payload
.
// Type é um valor de string que indica o tipo da tarefa.
func (t *Task) Type() string
// Payload é os dados necessários para a execução da tarefa.
func (t *Task) Payload() []byte
Agora que revisamos os tipos principais, vamos começar a escrever nossos programas.
Programa Cliente
Em client.go
, criaremos algumas tarefas e as colocaremos na fila de espera usando asynq.Client
.
Para criar uma tarefa, você pode usar a função NewTask
e passar o tipo e o payload da tarefa.
O método Enqueue
recebe uma tarefa e qualquer número de opções. Use as opções ProcessIn
ou ProcessAt
para agendar tarefas para processamento futuro.
// Payload relacionado a tarefas de email.
type EmailTaskPayload struct {
// ID do destinatário do email.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Crie uma tarefa com nome de tipo e payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Processar as tarefas imediatamente.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
// Processar as tarefas após 24 horas.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
}
Isso é tudo que precisamos para o nosso programa cliente.
Programa de Trabalhadores
Em workers.go
, vamos criar uma instância de asynq.Server
para iniciar os trabalhadores.
A função NewServer
recebe RedisConnOpt
e Config
como parâmetros.
O Config
é usado para ajustar o comportamento do processamento de tarefas do servidor.
Você pode consultar a documentação do Config
para conhecer todas as opções de configuração disponíveis.
Para simplificar, neste exemplo, apenas especificamos a concorrência.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Observação: Na próxima seção, vamos introduzir o que é um `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
O parâmetro do método (*Server).Run
é uma interface asynq.Handler
, que possui um método ProcessTask
.
type Handler interface {
// Se a tarefa for processada com sucesso, ProcessTask deve retornar nil.
// Se ProcessTask retornar um erro não nulo ou causar um pânico, a tarefa será reprocessada mais tarde.
ProcessTask(context.Context, *Task) error
}
A forma mais simples de implementar um manipulador é definir uma função com a mesma assinatura e usar o tipo adaptador asynq.HandlerFunc
ao passá-la para Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando email de boas-vindas para o usuário %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando lembrete por email para o usuário %d", p.UserID)
default:
return fmt.Errorf("Tipo de tarefa inesperado: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Use o adaptador asynq.HandlerFunc para manipular a função
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Podemos continuar adicionando casos de switch para esta função de manipulador, mas em um aplicativo real, será mais conveniente definir a lógica para cada caso em uma função separada.
Para refatorar nosso código, vamos usar o ServeMux
para criar nosso manipulador. Assim como o ServeMux
do pacote "net/http"
, você pode registrar um manipulador chamando Handle
ou HandleFunc
. ServeMux
satisfaz a interface Handler
, então pode ser passado para (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando email de boas-vindas para o usuário %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando lembrete por email para o usuário %d", p.UserID)
return nil
}
Agora que extraímos as funções de manipulação para cada tipo de tarefa, o código parece mais organizado. No entanto, o código ainda é um pouco implícito demais. Temos esses valores de string para tipos de tarefas e tipos de carga, e devemos encapsulá-los em um pacote orgânico. Vamos refatorar nosso código e escrever um pacote para encapsular a criação e manipulação de tarefas. Simplesmente criamos um pacote chamado task
.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"time"
"github.com/hibiken/asynq"
)
// Lista de tipos de tarefas.
const (
TypeWelcomeEmail = "email:boas-vindas"
TypeReminderEmail = "email:lembrete"
)
// Payload para qualquer tarefa relacionada a e-mails.
type EmailTaskPayload struct {
// ID do destinatário do e-mail.
UserID int
}
func NovoEmailBoasVindas(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NovoEmailLembrete(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func LidarComTarefaEmailBoasVindas(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando e-mail de boas-vindas para o usuário %d", p.UserID)
return nil
}
func LidarComTarefaEmailLembrete(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando e-mail de lembrete para o usuário %d", p.UserID)
return nil
}
Agora podemos importar este pacote em client.go
e workers.go
.
```go
// client.go
func main() {
cliente := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NovoEmailBoasVindas(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NovoEmailLembrete(42)
if err != nil {
log.Fatal(err)
}
// Colocar a tarefa na fila imediatamente.
info, err := cliente.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
// Colocar a tarefa na fila para ser processada após 24 horas.
info, err = cliente.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.LidarComTarefaEmailBoasVindas)
mux.HandleFunc(task.TypeReminderEmail, task.LidarComTarefaEmailLembrete)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
O código está ficando melhor agora!
Agora que temos o cliente
e os workers
prontos, podemos executar esses dois programas. Vamos começar executando o programa cliente
para criar e agendar tarefas.
go run client/client.go
Isso criará duas tarefas: uma para processamento imediato e outra para processamento após 24 horas.
Vamos usar a interface de linha de comando asynq
para inspecionar as tarefas.
asynq dash
Você deverá ver uma tarefa no estado Enfileirada e outra tarefa no estado Agendada.
Nota: Para entender o significado de cada estado, consulte a Life Cycle of a Task.
Finalmente, vamos iniciar o programa workers
para lidar com as tarefas.
go run workers/workers.go
Nota: Este programa não será encerrado até que você envie um sinal para terminá-lo. Para as melhores práticas sobre como encerrar os workers em segundo plano com segurança, consulte a página Wiki de Sinais.
Você deverá ver alguma saída de texto no terminal, indicando o processamento bem-sucedido das tarefas.
Você pode executar novamente o programa cliente
para ver como os workers as aceitam e as processam.
Não é incomum que uma tarefa falhe ao ser processada com sucesso na primeira tentativa. Por padrão, as tarefas falhas serão repetidas 25 vezes com espera exponencial. Vamos atualizar nosso handler para retornar um erro e simular uma situação mal sucedida.
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Tentando enviar um e-mail de boas-vindas para o usuário %d...", p.UserID)
return fmt.Errorf("Falha ao enviar o e-mail para o usuário")
}
Vamos reiniciar nosso programa de workers e enfileirar uma tarefa.
go run workers/workers.go
go run client/client.go
Se estiver executando asynq dash
, você deverá ver uma tarefa no estado de Retry (navegando para a visualização de detalhes da fila e destacando a guia "retry").
Para verificar quais tarefas estão no estado de retry, você também pode executar:
asynq task ls --queue=default --state=retry
Isso listará todas as tarefas que serão reexecutadas no futuro. A saída inclui o horário esperado da próxima execução para cada tarefa.
Uma vez que uma tarefa tenha esgotado as tentativas de retry, ela será movida para o estado de Arquivado e não será reexecutada novamente (ainda é possível executar manualmente tarefas arquivadas usando as ferramentas CLI ou WebUI).
Antes de concluir este tutorial, vamos corrigir nosso manipulador.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Enviando um e-mail de boas-vindas para o usuário %d", p.UserID)
return nil
}