Guide de démarrage

Dans ce tutoriel, nous allons écrire deux programmes, client et workers.

  • client.go va créer et planifier des tâches à traiter de manière asynchrone par des threads de travail en arrière-plan.
  • workers.go va démarrer plusieurs threads de travail concurrents pour gérer les tâches créées par le client.

Ce guide suppose que vous exécutez un serveur Redis sur localhost:6379. Avant de commencer, assurez-vous que Redis est installé et en cours d'exécution.

Commençons par créer nos deux fichiers principaux.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Ensuite, installez le package asynq.

go get -u github.com/hibiken/asynq

Avant de commencer à écrire le code, passons en revue quelques types principaux qui seront utilisés dans ces deux programmes.

Options de connexion à Redis

Asynq utilise Redis comme courtier de messages. client.go et workers.go doivent tous deux se connecter à Redis pour des opérations de lecture et d'écriture. Nous utiliserons RedisClientOpt pour spécifier la connexion au serveur Redis s'exécutant localement.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Le mot de passe peut être omis s'il n'est pas nécessaire
    Password: "monmotdepasse",
    // Utilisez un numéro de base de données dédié pour asynq.
    // Par défaut, Redis fournit 16 bases de données (de 0 à 15).
    DB: 0,
}

Tâches

Dans asynq, les unités de travail sont encapsulées dans un type appelé Task, qui a conceptuellement deux champs : Type et Payload.

// Type est une valeur de chaîne qui indique le type de la tâche.
func (t *Task) Type() string

// Payload est la donnée requise pour l'exécution de la tâche.
func (t *Task) Payload() []byte

Maintenant que nous avons examiné les types principaux, commençons à écrire nos programmes.

Programme Client

Dans client.go, nous allons créer quelques tâches et les mettre en file d'attente à l'aide de asynq.Client.

Pour créer une tâche, vous pouvez utiliser la fonction NewTask et passer le type et la charge utile de la tâche.

La méthode Enqueue prend une tâche et un nombre quelconque d'options. Utilisez les options ProcessIn ou ProcessAt pour planifier des tâches pour un traitement futur.

// Charge utile liée aux tâches d'e-mail.
type EmailTaskPayload struct {
    // ID du destinataire de l'e-mail.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Créer une tâche avec un nom de type et une charge utile.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:bienvenue", payload)

    t2 := asynq.NewTask("email:rappel", payload)

    // Traiter les tâches immédiatement.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tâche mise en file d'attente avec succès : %+v", info)

    // Traiter les tâches après 24 heures.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Tâche mise en file d'attente avec succès : %+v", info)
}

C'est tout ce dont nous avons besoin pour notre programme client.

Programme des travailleurs

Dans workers.go, nous allons créer une instance asynq.Server pour démarrer les travailleurs.

La fonction NewServer prend RedisConnOpt et Config comme paramètres.

Le Config est utilisé pour ajuster le comportement de traitement des tâches du serveur. Vous pouvez consulter la documentation Config pour en savoir plus sur toutes les options de configuration disponibles.

Pour simplifier, dans cet exemple, nous spécifions uniquement la concurrence.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Remarque : Dans la section suivante, nous introduirons ce qu'est un `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Le paramètre de la méthode (*Server).Run est une interface asynq.Handler, qui a une méthode ProcessTask.

type Handler interface {
    // Si la tâche est traitée avec succès, ProcessTask doit renvoyer nil.
    // Si ProcessTask renvoie une erreur non nulle ou provoque une panique, la tâche sera réessayée plus tard.
    ProcessTask(context.Context, *Task) error
}

La manière la plus simple d'implémenter un gestionnaire est de définir une fonction avec la même signature et d'utiliser le type adaptateur asynq.HandlerFunc lors de son passage à Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending welcome email to user %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending reminder email to user %d", p.UserID)

    default:
        return fmt.Errorf("Type de tâche inattendu : %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Utiliser l'adaptateur asynq.HandlerFunc pour gérer la fonction
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Nous pouvons continuer à ajouter des cas switch pour cette fonction de gestion, mais dans une application réelle, il sera plus pratique de définir la logique de chaque cas dans une fonction séparée.

Pour refactoriser notre code, utilisons ServeMux pour créer notre gestionnaire. Tout comme le ServeMux du package "net/http", vous pouvez enregistrer un gestionnaire en appelant Handle ou HandleFunc. ServeMux satisfait l'interface Handler, donc il peut être passé à (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

Maintenant que nous avons extrait les fonctions de traitement pour chaque type de tâche, le code semble plus organisé. Cependant, le code est encore un peu trop implicite. Nous avons ces valeurs de chaîne pour les types de tâches et les types de charges utiles, et nous devrions les encapsuler dans un package organique. Refactorisons notre code et écrivons un package pour encapsuler la création et le traitement des tâches. Créons simplement un package appelé task.

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// Liste des types de tâches.
const (
	TypeWelcomeEmail  = "email:bienvenue"
	TypeReminderEmail = "email:rappel"
)

// Charge utile pour toute tâche liée aux e-mails.
type EmailTaskPayload struct {
	// ID du destinataire de l'e-mail.
	UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Envoi d'un e-mail de bienvenue à l'utilisateur %d", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] Envoi d'un e-mail de rappel à l'utilisateur %d", p.UserID)
	return nil
}

Nous pouvons maintenant importer ce package dans client.go et workers.go.

```go
// client.go
func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := task.NewWelcomeEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := task.NewReminderEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	// Placer la tâche en file d'attente immédiatement.
	info, err := client.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tâche placée en file d'attente avec succès : %+v", info)

	// Placer la tâche en file d'attente pour être traitée après 24 heures.
	info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] Tâche placée en file d'attente avec succès : %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
	mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

Le code semble meilleur à présent !

Maintenant que nous avons le client et les workers prêts, nous pouvons exécuter ces deux programmes. Commençons par exécuter le programme client pour créer et programmer des tâches.

go run client/client.go

Cela créera deux tâches : une pour un traitement immédiat et une autre pour un traitement après 24 heures.

Utilisons l'interface de ligne de commande asynq pour inspecter les tâches.

asynq dash

Vous devriez pouvoir voir une tâche dans l'état Enqueued et une autre tâche dans l'état Scheduled.

Remarque : Pour comprendre la signification de chaque état, veuillez vous référer au Cycle de vie d'une tâche.

Enfin, lançons le programme workers pour gérer les tâches.

go run workers/workers.go

Remarque : Ce programme ne se terminera pas tant que vous ne lui enverrez pas un signal pour le terminer. Pour les meilleures pratiques sur la façon de terminer en toute sécurité les workers en arrière-plan, veuillez vous référer à la page Wiki sur les Signaux.

Vous devriez pouvoir voir une sortie de texte dans le terminal, indiquant le traitement réussi des tâches.

Vous pouvez exécuter à nouveau le programme client pour voir comment les workers les acceptent et les traitent.

Il n'est pas rare qu'une tâche échoue à être traitée avec succès dès la première tentative. Par défaut, les tâches échouées seront réessayées 25 fois avec un retour exponnentiel. Mettons à jour notre gestionnaire pour renvoyer une erreur afin de simuler une situation infructueuse.

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Tentative d'envoi d'un e-mail de bienvenue à l'utilisateur %d...", p.UserID)
    return fmt.Errorf("Échec de l'envoi de l'e-mail à l'utilisateur")
}

Redémarrons notre programme de travailleurs et mettons une tâche en file d'attente.

go run workers/workers.go
go run client/client.go

Si vous exécutez asynq dash, vous devriez pouvoir voir une tâche dans l'état Retry (en accédant à la vue détaillée de la file d'attente et en surlignant l'onglet "retry").

Pour vérifier quelles tâches sont dans l'état de réessai, vous pouvez également exécuter :

asynq task ls --queue=default --state=retry

Cela listera toutes les tâches qui seront réessayées ultérieurement. La sortie inclut l'heure prévue de la prochaine exécution pour chaque tâche.

Une fois qu'une tâche a épuisé ses tentatives de réessai, elle passera à l'état Archived et ne sera plus réessayée (vous pouvez toujours exécuter manuellement les tâches archivées à l'aide des outils CLI ou WebUI).

Avant de conclure ce tutoriel, corrigeons notre gestionnaire.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Envoi d'un e-mail de bienvenue à l'utilisateur %d", p.UserID)
    return nil 
}