Przewodnik po rozpoczęciu

W tym samouczku napiszemy dwa programy: client oraz workers.

  • client.go utworzy i zaplanuje zadania do przetworzenia asynchronicznie przez wątki robocze w tle.
  • workers.go uruchomi kilka równoczesnych wątków roboczych do obsługi zadań utworzonych przez klienta.

Ten przewodnik zakłada, że uruchamiasz serwer Redis na localhost:6379. Przed rozpoczęciem upewnij się, że Redis jest zainstalowany i uruchomiony.

Zacznijmy od utworzenia dwóch głównych plików.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Następnie zainstaluj pakiet asynq.

go get -u github.com/hibiken/asynq

Zanim zaczniemy pisać kod, przejrzyjmy kilka podstawowych typów, które będą używane w tych dwóch programach.

Opcje Połączenia z Redis

Asynq używa Redis jako brokera wiadomości. Zarówno client.go, jak i workers.go muszą łączyć się z Redis, aby wykonywać operacje odczytu i zapisu. Użyjemy RedisClientOpt, aby określić połączenie z lokalnie działającym serwerem Redis.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Hasło można pomijać, jeśli nie jest wymagane
    Password: "mojehaslo",
    // Użyj dedykowanego numeru bazy danych dla asynq.
    // Domyślnie Redis dostarcza 16 baz danych (od 0 do 15).
    DB: 0,
}

Zadania

W asynq, jednostki pracy są zawarte w typie o nazwie Task, który koncepcyjnie ma dwa pola: Type i Payload.

// Type jest łańcuchową wartością wskazującą typ zadania.
func (t *Task) Type() string

// Payload to dane wymagane do wykonania zadania.
func (t *Task) Payload() []byte

Teraz, gdy przyjrzeliśmy się podstawowym typom, zacznijmy pisać nasze programy.

Program Klienta

W client.go tworzymy pewne zadania i umieszczamy je w kolejce za pomocą asynq.Client.

Aby utworzyć zadanie, możesz użyć funkcji NewTask i przekazać typ i ładunek zadania.

Metoda Enqueue przyjmuje zadanie i dowolną liczbę opcji. Użyj opcji ProcessIn lub ProcessAt, aby zaplanować zadania do przetworzenia w przyszłości.

// Payload związany z zadaniami e-mail.
type EmailTaskPayload struct {
    // ID odbiorcy e-maila.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Utwórz zadanie o podanej nazwie i ładunku.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Przetwórz zadania natychmiast.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Pomyślnie umieszczono zadanie w kolejce: %+v", info)

    // Przetwórz zadania po 24 godzinach.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Pomyślnie umieszczono zadanie w kolejce: %+v", info)
}

To wszystko, co potrzebujemy dla naszego programu klienta.

Program Pracowników

W pliku workers.go utworzymy instancję asynq.Server, aby uruchomić pracowników.

Funkcja NewServer przyjmuje RedisConnOpt oraz Config jako parametry.

Config jest używany do dostosowania zachowania przetwarzania zadań serwera. Możesz odwołać się do dokumentacji Config, aby dowiedzieć się o wszystkich dostępnych opcjach konfiguracji.

Dla uproszczenia, w tym przykładzie określamy tylko konkurencyjność.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Uwaga: W następnym rozdziale przedstawimy, co to jest `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Parametrem metody (*Server).Run jest interfejs asynq.Handler, który posiada metodę ProcessTask.

type Handler interface {
    // Jeśli zadanie zostanie pomyślnie przetworzone, ProcessTask powinno zwrócić nil.
    // Jeśli ProcessTask zwróci nie-nilowy błąd lub spowoduje panikę, zadanie zostanie ponownie przetworzone później.
    ProcessTask(context.Context, *Task) error
}

Najprostszym sposobem zaimplementowania obsługi jest zdefiniowanie funkcji o tej samej sygnaturze i użycie typu adaptera asynq.HandlerFunc podczas jej przekazywania do Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Wysyłanie przypomnienia do użytkownika %d", p.UserID)

    default:
        return fmt.Errorf("Nieoczekiwany typ zadania: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Użyj adaptera asynq.HandlerFunc do obsługi funkcji
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Możemy kontynuować dodawanie instrukcji warunkowych dla tej funkcji obsługi, ale w rzeczywistej aplikacji będzie wygodniej zdefiniować logikę dla każdego przypadku w osobnej funkcji.

Aby zrefaktorować nasz kod, użyjmy ServeMux do utworzenia naszej obsługi. Tak jak ServeMux z pakietu "net/http", można zarejestrować obsługę, wywołując Handle lub HandleFunc. ServeMux spełnia interfejs Handler, więc można go przekazać do (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Wysyłanie przypomnienia do użytkownika %d", p.UserID)
    return nil
}

Teraz, gdy wyciągnęliśmy funkcje obsługi dla każdego typu zadania, kod wygląda bardziej uporządkowany. Jednakże kod jest wciąż trochę zbyt niejawny. Mamy te wartości ciągów dla typów zadań i typów ładunków, i powinniśmy je zawrzeć w organicznym pakiecie. Zrefaktorujmy nasz kod i napiszmy pakiet do enkapsulacji tworzenia zadań i ich obsługi. Wystarczy utworzyć pakiet o nazwie task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// Lista typów zadań.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Dane ładunku dla zadań związanym z e-mailami.
type EmailTaskPayload struct {
    // ID odbiorcy e-maila.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Wysyłanie przypomnienia drogą e-mailową do użytkownika %d", p.UserID)
    return nil
}

Teraz możemy zaimportować ten pakiet do client.go oraz workers.go.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Natychmiast dodaj zadanie do kolejki.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Zadanie pomyślnie dodane do kolejki: %+v", info)

    // Dodaj zadanie do przetworzenia po 24 godzinach.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Zadanie pomyślnie dodane do kolejki: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

Kod wygląda teraz lepiej!

Mając gotowe client oraz workers, możemy uruchomić te dwa programy. Zacznijmy od uruchomienia programu client, aby utworzyć i zaplanować zadania.

go run client/client.go

Spowoduje to utworzenie dwóch zadań: jednego do natychmiastowego przetworzenia oraz drugiego do przetworzenia po 24 godzinach.

Skorzystajmy z interfejsu wiersza poleceń asynq, aby sprawdzić zadania.

asynq dash

Powinieneś zobaczyć jedno zadanie w stanie Enqueued (Dodane do kolejki) oraz inne zadanie w stanie Scheduled (Zaplanowane).

Uwaga: Aby zrozumieć znaczenie każdego stanu, proszę odnieść się do Cyklu życia zadania.

Na koniec uruchommy program workers, aby obsłużył zadania.

go run workers/workers.go

Uwaga: Ten program nie zakończy działania, dopóki nie zostanie wysłany sygnał do zakończenia. Aby zapoznać się z praktykami dotyczącymi bezpiecznego zatrzymywania tła workers, proszę odnieść się do strony Wiki Sygnały.

Powinieneś zobaczyć tekstowe wyjście w terminalu, oznaczające pomyślne przetworzenie zadań.

Możesz ponownie uruchomić program client, aby zobaczyć, jak pracownicy je przyjmują i przetwarzają.

Nie jest rzadkością, że zadanie nie będzie mogło zostać przetworzone pomyślnie za pierwszym razem. Domyślnie nieudane zadania będą ponawiane 25 razy, z wykładniczym odstępem czasu. Zaktualizujmy nasz obsługiwacz, aby zwrócił błąd i zasymulował niepomyślną sytuację.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Próba wysłania wiadomości powitalnej do użytkownika %d...", p.UserID)
    return fmt.Errorf("Nie udało się wysłać wiadomości e-mail do użytkownika")
}

Uruchom ponownie program workers i dodaj zadanie do kolejki.

go run workers/workers.go
go run client/client.go

Jeśli uruchamiasz asynq dash, powinieneś zobaczyć zadanie w stanie Retry (przechodząc do widoku szczegółów kolejki i zaznaczając zakładkę "retry").

Aby sprawdzić, które zadania znajdują się w stanie ponownej próby, możesz również użyć polecenia:

asynq task ls --queue=default --state=retry

Spowoduje to wyświetlenie wszystkich zadań, które będą ponownie próbowane w przyszłości. Wynik będzie zawierał planowany czas kolejnego wykonania dla każdego zadania.

Gdy zadanie wyczerpie wszystkie próby ponownego wykonania, przejdzie ono do stanu Archived i nie będzie ponownie wykonywane (nadawca nadal może ręcznie uruchomić zarchiwizowane zadania stosując narzędzia CLI lub WebUI).

Przed zakończeniem tego samouczka, poprawmy nasz manipulator.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Wysłanie wiadomości powitalnej do użytkownika %d", p.UserID)
    return nil 
}