Guida introduttiva
In questo tutorial, scriveremo due programmi, client
e workers
.
-
client.go
creerà e programmerà attività da elaborare in modo asincrono dai thread worker in background. -
workers.go
avvierà più thread worker concorrenti per gestire le attività create dal client.
Questa guida presuppone che tu stia eseguendo un server Redis su localhost:6379
. Prima di iniziare, assicurati che Redis sia installato e in esecuzione.
Iniziamo con la creazione dei nostri due file principali.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Successivamente, installa il pacchetto asynq
.
go get -u github.com/hibiken/asynq
Prima di iniziare a scrivere il codice, esaminiamo alcuni tipi principali che verranno utilizzati in questi due programmi.
Opzioni di connessione a Redis
Asynq utilizza Redis come message broker. Sia client.go
che workers.go
devono connettersi a Redis per operazioni di lettura e scrittura. Utilizzeremo RedisClientOpt
per specificare la connessione al server Redis in esecuzione in locale.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// La password può essere omessa se non necessaria
Password: "mypassword",
// Utilizza un numero di database dedicato per asynq.
// Per impostazione predefinita, Redis fornisce 16 database (da 0 a 15).
DB: 0,
}
Attività
In asynq
, le unità di lavoro sono incapsulate in un tipo chiamato Task
, che concettualmente ha due campi: Type
e Payload
.
// Type è un valore di stringa che indica il tipo di attività.
func (t *Task) Type() string
// Payload è il dato richiesto per l'esecuzione dell'attività.
func (t *Task) Payload() []byte
Ora che abbiamo esaminato i tipi principali, iniziamo a scrivere i nostri programmi.
Programma Client
In client.go
, creeremo alcune attività e le inseriremo in coda utilizzando asynq.Client
.
Per creare un'attività, è possibile utilizzare la funzione NewTask
e passare il tipo e il payload dell'attività.
Il metodo Enqueue
richiede un'attività e qualsiasi numero di opzioni. Utilizza le opzioni ProcessIn
o ProcessAt
per pianificare attività per la futura elaborazione.
// Payload relativo alle attività di posta elettronica.
type EmailTaskPayload struct {
// ID del destinatario dell'email.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Crea un'attività con nome del tipo e payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Elabora immediatamente le attività.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Attività inserita in coda con successo: %+v", info)
// Elabora le attività dopo 24 ore.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Attività inserita in coda con successo: %+v", info)
}
Ecco tutto ciò di cui abbiamo bisogno per il nostro programma client.
Programma dei Lavoratori
In workers.go
, creeremo un'istanza di asynq.Server
per avviare i lavoratori.
La funzione NewServer
prende RedisConnOpt
e Config
come parametri.
Il Config
è utilizzato per regolare il comportamento di elaborazione dei compiti del server.
È possibile fare riferimento alla documentazione di Config
per conoscere tutte le opzioni di configurazione disponibili.
Per semplicità, in questo esempio, specifichiamo solo la concorrenza.
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Nota: Nella sezione seguente, introdurremo cos'è un `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
Il parametro del metodo (*Server).Run
è un'interfaccia asynq.Handler
, che ha un metodo ProcessTask
.
type Handler interface {
// Se il compito viene elaborato con successo, ProcessTask dovrebbe restituire nil.
// Se ProcessTask restituisce un errore diverso da nil o provoca un panic, il compito verrà ritentato più tardi.
ProcessTask(context.Context, *Task) error
}
Il modo più semplice per implementare un handler è definire una funzione con la stessa firma e utilizzare il tipo di adattatore asynq.HandlerFunc
quando la si passa a Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio di un'email di benvenuto all'utente %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio di un'email promemoria all'utente %d", p.UserID)
default:
return fmt.Errorf("Tipo di compito inaspettato: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Utilizzare l'adattatore asynq.HandlerFunc per gestire la funzione
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Possiamo continuare ad aggiungere casi switch per questa funzione handler, ma in un'applicazione reale, sarà più conveniente definire la logica per ciascun caso in una funzione separata.
Per raffinare il nostro codice, utilizziamo ServeMux
per creare il nostro gestore. Proprio come il ServeMux
del pacchetto "net/http"
, è possibile registrare un gestore chiamando Handle
o HandleFunc
. ServeMux
soddisfa l'interfaccia Handler
, quindi può essere passato a (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio di un'email di benvenuto all'utente %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio di un'email promemoria all'utente %d", p.UserID)
return nil
}
Ora che abbiamo estratto le funzioni di gestione per ogni tipo di compito, il codice sembra più organizzato. Tuttavia, il codice è ancora un po' troppo implicito. Abbiamo questi valori di stringa per i tipi di compito e i tipi di payload, e dovremmo incapsularli in un pacchetto organico. Raffiniamo il nostro codice e scriviamo un pacchetto per incapsulare la creazione e la gestione dei compiti. Semplicemente creiamo un pacchetto chiamato task
.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// Elenco dei tipi di attività.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Payload per qualsiasi attività legata alle email.
type EmailTaskPayload struct {
// ID del destinatario dell'email.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio dell'email di benvenuto all'utente %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio dell'email di promemoria all'utente %d", p.UserID)
return nil
}
Ora possiamo importare questo pacchetto in client.go
e workers.go
.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Accoda immediatamente l'attività.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Attività accodata con successo: %+v", info)
// Accoda l'attività da elaborare dopo 24 ore.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Attività accodata con successo: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
Il codice ora appare più pulito!
Ora che abbiamo il client
e i workers
pronti, possiamo eseguire questi due programmi. Cominciamo eseguendo il programma client
per creare e pianificare le attività.
go run client/client.go
Ciò creerà due attività: una per l'elaborazione immediata e un'altra per l'elaborazione dopo 24 ore.
Utilizziamo l'interfaccia della riga di comando asynq
per ispezionare le attività.
asynq dash
Dovresti essere in grado di vedere un'attività nello stato Accodato (Enqueued) e un'altra attività nello stato Pianificato (Scheduled).
Nota: Per comprendere il significato di ciascuno stato, fare riferimento alla Vita di un'attività.
Infine, avviamo il programma workers
per gestire le attività.
go run workers/workers.go
Nota: Questo programma non uscirà finché non si invierà un segnale per terminarlo. Per le migliori pratiche su come terminare in modo sicuro i lavoratori in background, fare riferimento alla pagina Wiki sui Segnali.
Dovresti essere in grado di vedere un output di testo nel terminale, che indica il corretto completamento delle attività.
Puoi eseguire nuovamente il programma client
per vedere come i lavoratori le accolgono e le elaborano.
Non è infrequente che un'attività non riesca a elaborarsi con successo al primo tentativo. Per impostazione predefinita, le attività fallite verranno riprovate 25 volte con ritardi esponenziali. Modifichiamo il nostro gestore per restituire un errore e simulare una situazione non riuscita.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Tentativo di inviare una email di benvenuto all'utente %d...", p.UserID)
return fmt.Errorf("Impossibile inviare l'email all'utente")
}
Riavviamo il programma dei nostri worker e accodiamo un task.
go run workers/workers.go
go run client/client.go
Se stai eseguendo asynq dash
, dovresti vedere un task nello stato Ritentare (navigando alla vista dettagli coda e evidenziando la scheda "retry").
Per controllare quali task sono nello stato di ritentare, puoi eseguire anche il seguente comando:
asynq task ls --queue=default --state=retry
Verranno elencati tutti i task che verranno ritentati in futuro. L'output include l'orario previsto della prossima esecuzione per ciascun task.
Una volta esauriti i tentativi di ritentare di un task, passerà allo stato Archiviato e non verrà ritentato di nuovo (è comunque possibile eseguire manualmente i task archiviati utilizzando strumenti CLI o WebUI).
Prima di concludere questo tutorial, correggiamo il nostro gestore.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Invio di una email di benvenuto all'utente %d", p.UserID)
return nil
}