Guia de Início

Neste tutorial, vamos escrever dois programas, client e workers.

  • client.go irá criar e agendar tarefas para serem processadas de forma assíncrona por threads de trabalho em segundo plano.
  • workers.go irá iniciar várias threads de trabalho concorrentes para lidar com tarefas criadas pelo cliente.

Este guia parte do pressuposto de que você está executando um servidor Redis em localhost:6379. Antes de começar, certifique-se de que o Redis está instalado e em execução.

Vamos primeiro criar nossos dois arquivos principais.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Em seguida, instale o pacote asynq.

go get -u github.com/hibiken/asynq

Antes de começarmos a escrever o código, vamos revisar alguns tipos principais que serão usados nestes dois programas.

Opções de Conexão com o Redis

Asynq usa o Redis como um corretor de mensagens. Tanto client.go quanto workers.go precisam se conectar ao Redis para operações de leitura e escrita. Usaremos RedisClientOpt para especificar a conexão com o servidor Redis em execução local.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // A senha pode ser omitida se não for necessária
    Password: "minhasenha",
    // Use um número de banco de dados dedicado para asynq.
    // Por padrão, o Redis fornece 16 bancos de dados (0 a 15).
    DB: 0,
}

Tarefas

No asynq, unidades de trabalho são encapsuladas em um tipo chamado Task, que conceitualmente possui dois campos: Type e Payload.

// Type é um valor de string que indica o tipo da tarefa.
func (t *Task) Type() string

// Payload é os dados necessários para a execução da tarefa.
func (t *Task) Payload() []byte

Agora que revisamos os tipos principais, vamos começar a escrever nossos programas.

Programa Cliente

Em client.go, criaremos algumas tarefas e as colocaremos na fila de espera usando asynq.Client.

Para criar uma tarefa, você pode usar a função NewTask e passar o tipo e o payload da tarefa.

O método Enqueue recebe uma tarefa e qualquer número de opções. Use as opções ProcessIn ou ProcessAt para agendar tarefas para processamento futuro.

// Payload relacionado a tarefas de email.
type EmailTaskPayload struct {
    // ID do destinatário do email.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Crie uma tarefa com nome de tipo e payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Processar as tarefas imediatamente.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)

    // Processar as tarefas após 24 horas.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
}

Isso é tudo que precisamos para o nosso programa cliente.

Programa de Trabalhadores

Em workers.go, vamos criar uma instância de asynq.Server para iniciar os trabalhadores.

A função NewServer recebe RedisConnOpt e Config como parâmetros.

O Config é usado para ajustar o comportamento do processamento de tarefas do servidor. Você pode consultar a documentação do Config para conhecer todas as opções de configuração disponíveis.

Para simplificar, neste exemplo, apenas especificamos a concorrência.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Observação: Na próxima seção, vamos introduzir o que é um `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

O parâmetro do método (*Server).Run é uma interface asynq.Handler, que possui um método ProcessTask.

type Handler interface {
    // Se a tarefa for processada com sucesso, ProcessTask deve retornar nil.
    // Se ProcessTask retornar um erro não nulo ou causar um pânico, a tarefa será reprocessada mais tarde.
    ProcessTask(context.Context, *Task) error
}

A forma mais simples de implementar um manipulador é definir uma função com a mesma assinatura e usar o tipo adaptador asynq.HandlerFunc ao passá-la para Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Enviando email de boas-vindas para o usuário %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Enviando lembrete por email para o usuário %d", p.UserID)

    default:
        return fmt.Errorf("Tipo de tarefa inesperado: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Use o adaptador asynq.HandlerFunc para manipular a função
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Podemos continuar adicionando casos de switch para esta função de manipulador, mas em um aplicativo real, será mais conveniente definir a lógica para cada caso em uma função separada.

Para refatorar nosso código, vamos usar o ServeMux para criar nosso manipulador. Assim como o ServeMux do pacote "net/http", você pode registrar um manipulador chamando Handle ou HandleFunc. ServeMux satisfaz a interface Handler, então pode ser passado para (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando email de boas-vindas para o usuário %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando lembrete por email para o usuário %d", p.UserID)
    return nil
}

Agora que extraímos as funções de manipulação para cada tipo de tarefa, o código parece mais organizado. No entanto, o código ainda é um pouco implícito demais. Temos esses valores de string para tipos de tarefas e tipos de carga, e devemos encapsulá-los em um pacote orgânico. Vamos refatorar nosso código e escrever um pacote para encapsular a criação e manipulação de tarefas. Simplesmente criamos um pacote chamado task.

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

// Lista de tipos de tarefas.
const (
	TypeWelcomeEmail  = "email:boas-vindas"
	TypeReminderEmail = "email:lembrete"
)

// Payload para qualquer tarefa relacionada a e-mails.
type EmailTaskPayload struct {
	// ID do destinatário do e-mail.
	UserID int
}

func NovoEmailBoasVindas(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NovoEmailLembrete(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func LidarComTarefaEmailBoasVindas(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Enviando e-mail de boas-vindas para o usuário %d", p.UserID)
	return nil
}

func LidarComTarefaEmailLembrete(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Enviando e-mail de lembrete para o usuário %d", p.UserID)
	return nil
}

Agora podemos importar este pacote em client.go e workers.go.

```go
// client.go
func main() {
	cliente := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := task.NovoEmailBoasVindas(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := task.NovoEmailLembrete(42)
	if err != nil {
		log.Fatal(err)
	}

	// Colocar a tarefa na fila imediatamente.
	info, err := cliente.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)

	// Colocar a tarefa na fila para ser processada após 24 horas.
	info, err = cliente.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tarefa enfileirada com sucesso: %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeWelcomeEmail, task.LidarComTarefaEmailBoasVindas)
	mux.HandleFunc(task.TypeReminderEmail, task.LidarComTarefaEmailLembrete)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

O código está ficando melhor agora!

Agora que temos o cliente e os workers prontos, podemos executar esses dois programas. Vamos começar executando o programa cliente para criar e agendar tarefas.

go run client/client.go

Isso criará duas tarefas: uma para processamento imediato e outra para processamento após 24 horas.

Vamos usar a interface de linha de comando asynq para inspecionar as tarefas.

asynq dash

Você deverá ver uma tarefa no estado Enfileirada e outra tarefa no estado Agendada.

Nota: Para entender o significado de cada estado, consulte a Life Cycle of a Task.

Finalmente, vamos iniciar o programa workers para lidar com as tarefas.

go run workers/workers.go

Nota: Este programa não será encerrado até que você envie um sinal para terminá-lo. Para as melhores práticas sobre como encerrar os workers em segundo plano com segurança, consulte a página Wiki de Sinais.

Você deverá ver alguma saída de texto no terminal, indicando o processamento bem-sucedido das tarefas.

Você pode executar novamente o programa cliente para ver como os workers as aceitam e as processam.

Não é incomum que uma tarefa falhe ao ser processada com sucesso na primeira tentativa. Por padrão, as tarefas falhas serão repetidas 25 vezes com espera exponencial. Vamos atualizar nosso handler para retornar um erro e simular uma situação mal sucedida.

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Tentando enviar um e-mail de boas-vindas para o usuário %d...", p.UserID)
    return fmt.Errorf("Falha ao enviar o e-mail para o usuário")
}

Vamos reiniciar nosso programa de workers e enfileirar uma tarefa.

go run workers/workers.go
go run client/client.go

Se estiver executando asynq dash, você deverá ver uma tarefa no estado de Retry (navegando para a visualização de detalhes da fila e destacando a guia "retry").

Para verificar quais tarefas estão no estado de retry, você também pode executar:

asynq task ls --queue=default --state=retry

Isso listará todas as tarefas que serão reexecutadas no futuro. A saída inclui o horário esperado da próxima execução para cada tarefa.

Uma vez que uma tarefa tenha esgotado as tentativas de retry, ela será movida para o estado de Arquivado e não será reexecutada novamente (ainda é possível executar manualmente tarefas arquivadas usando as ferramentas CLI ou WebUI).

Antes de concluir este tutorial, vamos corrigir nosso manipulador.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Enviando um e-mail de boas-vindas para o usuário %d", p.UserID)
    return nil 
}