Erste Schritte

In diesem Tutorial werden wir zwei Programme namens client und workers schreiben.

  • client.go erstellt und plant Aufgaben, die von Hintergrund-Worker-Threads asynchron verarbeitet werden sollen.
  • workers.go startet mehrere gleichzeitige Worker-Threads, um die vom Client erstellten Aufgaben zu verarbeiten.

In dieser Anleitung wird davon ausgegangen, dass auf localhost:6379 ein Redis-Server läuft. Stellen Sie vor dem Start sicher, dass Redis installiert und ausgeführt wird.

Lassen Sie uns zuerst unsere beiden Hauptdateien erstellen.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Dann installieren Sie das asynq-Paket.

go get -u github.com/hibiken/asynq

Bevor wir mit dem Schreiben des Codes beginnen, werfen wir einen Blick auf einige Kernarten, die in diesen beiden Programmen verwendet werden.

Redis-Verbindungsoptionen

Asynq verwendet Redis als Message Broker. Sowohl client.go als auch workers.go müssen eine Verbindung zu Redis für Lese- und Schreibvorgänge herstellen. Wir verwenden RedisClientOpt, um die Verbindung zum lokal laufenden Redis-Server anzugeben.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Das Passwort kann weggelassen werden, wenn es nicht benötigt wird.
    Password: "mypassword",
    // Verwenden Sie eine dedizierte Datenbanknummer für asynq.
    // Standardmäßig bietet Redis 16 Datenbanken (0 bis 15).
    DB: 0,
}

Aufgaben

In asynq sind Arbeitspakete in einem Typ namens Task verkapselt, der konzeptuell zwei Felder hat: Type und Payload.

// Der Typ ist ein Zeichenfolgenwert, der den Typ der Aufgabe angibt.
func (t *Task) Type() string

// Das Payload ist die für die Aufgabenausführung erforderlichen Daten.
func (t *Task) Payload() []byte

Nachdem wir uns die Kernarten angesehen haben, können wir mit dem Schreiben unserer Programme beginnen.

Client-Programm

In client.go erstellen wir einige Aufgaben und stellen sie mit asynq.Client in die Warteschlange.

Um eine Aufgabe zu erstellen, können Sie die Funktion NewTask verwenden und den Typ und den Payload der Aufgabe übergeben.

Die Methode Enqueue nimmt eine Aufgabe und eine beliebige Anzahl von Optionen entgegen. Verwenden Sie die Optionen ProcessIn oder ProcessAt, um Aufgaben für die zukünftige Verarbeitung zu planen.

// Payload, die mit E-Mail-Aufgaben zusammenhängt.
type EmailTaskPayload struct {
    // ID des E-Mail-Empfängers.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Erstellen einer Aufgabe mit Typnamen und Payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Die Aufgaben sofort verarbeiten.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Die Aufgabe wurde erfolgreich in die Warteschlange gestellt: %+v", info)

    // Die Aufgaben nach 24 Stunden verarbeiten.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Die Aufgabe wurde erfolgreich in die Warteschlange gestellt: %+v", info)
}

Das ist alles, was wir für unser Client-Programm brauchen.

Workers Programm

Im workers.go erstellen wir eine Instanz von asynq.Server, um die Arbeiter zu starten.

Die Funktion NewServer nimmt RedisConnOpt und Config als Parameter.

Der Config wird verwendet, um das Verhalten der Aufgabenverarbeitung des Servers anzupassen. Sie können die Config Dokumentation konsultieren, um alle verfügbaren Konfigurationsoptionen kennenzulernen.

In diesem Beispiel beschränken wir uns der Einfachheit halber auf die Konkurrenz.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Hinweis: Im folgenden Abschnitt werden wir einführen, was `handler` ist.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Der Parameter der Methode (*Server).Run ist ein Interface asynq.Handler, das eine Methode ProcessTask enthält.

type Handler interface {
    // Wenn die Aufgabe erfolgreich verarbeitet wird, sollte ProcessTask nil zurückgeben.
    // Wenn ProcessTask einen nicht-nil Fehler zurückgibt oder zu einem Panic führt, wird die Aufgabe später erneut versucht.
    ProcessTask(context.Context, *Task) error
}

Der einfachste Weg, einen Handler zu implementieren, besteht darin, eine Funktion mit derselben Signatur zu definieren und den asynq.HandlerFunc Adaptertyp zu verwenden, wenn er an Run übergeben wird.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sende Willkommens-E-Mail an Benutzer %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sende Erinnerungs-E-Mail an Benutzer %d", p.UserID)

    default:
        return fmt.Errorf("Unerwarteter Aufgaben-Typ: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Verwenden Sie den asynq.HandlerFunc-Adapter, um die Funktion zu behandeln
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Wir können weitere switch cases für diese Handler-Funktion hinzufügen, aber in einer tatsächlichen Anwendung ist es praktischer, die Logik für jeden Fall in einer separaten Funktion zu definieren.

Um unseren Code zu überarbeiten, verwenden wir ServeMux, um unseren Handler zu erstellen. Genau wie der ServeMux aus dem Paket "net/http", können Sie einen Handler registrieren, indem Sie Handle oder HandleFunc aufrufen. ServeMux erfüllt das Handler-Interface, sodass es an (*Server).Run übergeben werden kann.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sende Willkommens-E-Mail an Benutzer %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sende Erinnerungs-E-Mail an Benutzer %d", p.UserID)
    return nil
}

Jetzt, da wir die Handling-Funktionen für jeden Typ von Aufgabe extrahiert haben, sieht der Code organisierter aus. Dennoch ist der Code noch ein wenig zu implizit. Wir haben diese Zeichenfolgenwerte für Aufgabentypen und Payload-Typen, und wir sollten sie in einem kohärenten Paket kapseln. Lassen Sie uns unseren Code überarbeiten und ein Paket schreiben, um die Erstellung und Behandlung von Aufgaben zu kapseln. Wir erstellen einfach ein Paket namens task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// Liste der Aufgabentypen.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Nutzlast für jede mit E-Mails verbundene Aufgabe.
type EmailTaskPayload struct {
    // ID des E-Mail-Empfängers.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sende Willkommens-E-Mail an Benutzer %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sende Erinnerungs-E-Mail an Benutzer %d", p.UserID)
    return nil
}

Jetzt können wir dieses Paket in client.go und workers.go importieren.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Fügen Sie die Aufgabe sofort in die Warteschlange ein.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Aufgabe erfolgreich in die Warteschlange gestellt: %+v", info)

    // Fügen Sie die Aufgabe ein, um nach 24 Stunden verarbeitet zu werden.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Aufgabe erfolgreich in die Warteschlange gestellt: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

Der Code sieht jetzt besser aus!

Nun, da wir den client und die workers bereit haben, können wir diese beiden Programme ausführen. Beginnen wir damit, das client-Programm auszuführen, um Aufgaben zu erstellen und zu planen.

go run client/client.go

Dadurch werden zwei Aufgaben erstellt: eine für die sofortige Verarbeitung und eine andere, die nach 24 Stunden verarbeitet wird.

Lassen Sie uns die Befehlszeilenschnittstelle asynq verwenden, um die Aufgaben zu inspizieren.

asynq dash

Sie sollten eine Aufgabe im Zustand In Wartestellung und eine weitere Aufgabe im Zustand Geplant sehen können.

Hinweis: Um die Bedeutung jedes Zustands zu verstehen, lesen Sie bitte das Task Lifecycle (Aufgabenlebenszyklus).

Schließlich starten wir das workers-Programm, um die Aufgaben zu bearbeiten.

go run workers/workers.go

Hinweis: Dieses Programm wird nicht beendet, bis Sie ein Signal senden, um es zu beenden. Für bewährte Verfahren zum sicheren Beenden der Hintergrund-Worker lesen Sie bitte die Signals Wiki-Seite.

Sie sollten einige Textausgaben im Terminal sehen können, die die erfolgreiche Verarbeitung von Aufgaben anzeigen.

Sie können das client-Programm erneut ausführen, um zu sehen, wie die Worker sie akzeptieren und verarbeiten.

Es ist nicht ungewöhnlich, dass eine Aufgabe beim ersten Versuch nicht erfolgreich verarbeitet wird. Standardmäßig werden fehlgeschlagene Aufgaben 25 Mal mit exponentiellem Backoff wiederholt. Lassen Sie uns unseren Handler aktualisieren, um einen Fehler zurückzugeben und eine nicht erfolgreiche Situation zu simulieren.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Versuche, eine Willkommens-E-Mail an Benutzer %d zu senden...", p.UserID)
    return fmt.Errorf("Fehler beim Senden der E-Mail an den Benutzer")
}

Lassen Sie uns unser Worker-Programm neu starten und eine Aufgabe in die Warteschlange stellen.

go run workers/workers.go
go run client/client.go

Wenn Sie asynq dash ausführen, sollten Sie eine Aufgabe im Zustand Wiederholen sehen können (indem Sie zur Ansicht der Warteschlangendetails navigieren und den "Wiederholen"-Tab hervorheben).

Um zu überprüfen, welche Aufgaben sich im Wiederholungsstatus befinden, können Sie auch folgendes ausführen:

asynq task ls --queue=default --state=retry

Dadurch werden alle Aufgaben aufgelistet, die in Zukunft wiederholt werden. Die Ausgabe enthält die erwartete Zeit der nächsten Ausführung für jede Aufgabe.

Sobald eine Aufgabe ihre Wiederholungsversuche erschöpft hat, wird sie in den Zustand Archiviert übergehen und nicht erneut wiederholt (Sie können archivierte Aufgaben jedoch weiterhin manuell mit CLI- oder WebUI-Tools ausführen).

Bevor wir dieses Tutorial abschließen, lassen Sie uns unseren Handler reparieren.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sende eine Willkommens-E-Mail an Benutzer %d", p.UserID)
    return nil
}