Przewodnik po rozpoczęciu
W tym samouczku napiszemy dwa programy: client
oraz workers
.
-
client.go
utworzy i zaplanuje zadania do przetworzenia asynchronicznie przez wątki robocze w tle. -
workers.go
uruchomi kilka równoczesnych wątków roboczych do obsługi zadań utworzonych przez klienta.
Ten przewodnik zakłada, że uruchamiasz serwer Redis na localhost:6379
. Przed rozpoczęciem upewnij się, że Redis jest zainstalowany i uruchomiony.
Zacznijmy od utworzenia dwóch głównych plików.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
Następnie zainstaluj pakiet asynq
.
go get -u github.com/hibiken/asynq
Zanim zaczniemy pisać kod, przejrzyjmy kilka podstawowych typów, które będą używane w tych dwóch programach.
Opcje Połączenia z Redis
Asynq używa Redis jako brokera wiadomości. Zarówno client.go
, jak i workers.go
muszą łączyć się z Redis, aby wykonywać operacje odczytu i zapisu. Użyjemy RedisClientOpt
, aby określić połączenie z lokalnie działającym serwerem Redis.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Hasło można pomijać, jeśli nie jest wymagane
Password: "mojehaslo",
// Użyj dedykowanego numeru bazy danych dla asynq.
// Domyślnie Redis dostarcza 16 baz danych (od 0 do 15).
DB: 0,
}
Zadania
W asynq
, jednostki pracy są zawarte w typie o nazwie Task
, który koncepcyjnie ma dwa pola: Type
i Payload
.
// Type jest łańcuchową wartością wskazującą typ zadania.
func (t *Task) Type() string
// Payload to dane wymagane do wykonania zadania.
func (t *Task) Payload() []byte
Teraz, gdy przyjrzeliśmy się podstawowym typom, zacznijmy pisać nasze programy.
Program Klienta
W client.go
tworzymy pewne zadania i umieszczamy je w kolejce za pomocą asynq.Client
.
Aby utworzyć zadanie, możesz użyć funkcji NewTask
i przekazać typ i ładunek zadania.
Metoda Enqueue
przyjmuje zadanie i dowolną liczbę opcji. Użyj opcji ProcessIn
lub ProcessAt
, aby zaplanować zadania do przetworzenia w przyszłości.
// Payload związany z zadaniami e-mail.
type EmailTaskPayload struct {
// ID odbiorcy e-maila.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Utwórz zadanie o podanej nazwie i ładunku.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Przetwórz zadania natychmiast.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Pomyślnie umieszczono zadanie w kolejce: %+v", info)
// Przetwórz zadania po 24 godzinach.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Pomyślnie umieszczono zadanie w kolejce: %+v", info)
}
To wszystko, co potrzebujemy dla naszego programu klienta.
Program Pracowników
W pliku workers.go
utworzymy instancję asynq.Server
, aby uruchomić pracowników.
Funkcja NewServer
przyjmuje RedisConnOpt
oraz Config
jako parametry.
Config
jest używany do dostosowania zachowania przetwarzania zadań serwera.
Możesz odwołać się do dokumentacji Config
, aby dowiedzieć się o wszystkich dostępnych opcjach konfiguracji.
Dla uproszczenia, w tym przykładzie określamy tylko konkurencyjność.
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Uwaga: W następnym rozdziale przedstawimy, co to jest `handler`.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
Parametrem metody (*Server).Run
jest interfejs asynq.Handler
, który posiada metodę ProcessTask
.
type Handler interface {
// Jeśli zadanie zostanie pomyślnie przetworzone, ProcessTask powinno zwrócić nil.
// Jeśli ProcessTask zwróci nie-nilowy błąd lub spowoduje panikę, zadanie zostanie ponownie przetworzone później.
ProcessTask(context.Context, *Task) error
}
Najprostszym sposobem zaimplementowania obsługi jest zdefiniowanie funkcji o tej samej sygnaturze i użycie typu adaptera asynq.HandlerFunc
podczas jej przekazywania do Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie przypomnienia do użytkownika %d", p.UserID)
default:
return fmt.Errorf("Nieoczekiwany typ zadania: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Użyj adaptera asynq.HandlerFunc do obsługi funkcji
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
Możemy kontynuować dodawanie instrukcji warunkowych dla tej funkcji obsługi, ale w rzeczywistej aplikacji będzie wygodniej zdefiniować logikę dla każdego przypadku w osobnej funkcji.
Aby zrefaktorować nasz kod, użyjmy ServeMux
do utworzenia naszej obsługi. Tak jak ServeMux
z pakietu "net/http"
, można zarejestrować obsługę, wywołując Handle
lub HandleFunc
. ServeMux
spełnia interfejs Handler
, więc można go przekazać do (*Server).Run
.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie przypomnienia do użytkownika %d", p.UserID)
return nil
}
Teraz, gdy wyciągnęliśmy funkcje obsługi dla każdego typu zadania, kod wygląda bardziej uporządkowany. Jednakże kod jest wciąż trochę zbyt niejawny. Mamy te wartości ciągów dla typów zadań i typów ładunków, i powinniśmy je zawrzeć w organicznym pakiecie. Zrefaktorujmy nasz kod i napiszmy pakiet do enkapsulacji tworzenia zadań i ich obsługi. Wystarczy utworzyć pakiet o nazwie task
.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// Lista typów zadań.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Dane ładunku dla zadań związanym z e-mailami.
type EmailTaskPayload struct {
// ID odbiorcy e-maila.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie wiadomości powitalnej do użytkownika %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysyłanie przypomnienia drogą e-mailową do użytkownika %d", p.UserID)
return nil
}
Teraz możemy zaimportować ten pakiet do client.go
oraz workers.go
.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Natychmiast dodaj zadanie do kolejki.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Zadanie pomyślnie dodane do kolejki: %+v", info)
// Dodaj zadanie do przetworzenia po 24 godzinach.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Zadanie pomyślnie dodane do kolejki: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
Kod wygląda teraz lepiej!
Mając gotowe client
oraz workers
, możemy uruchomić te dwa programy. Zacznijmy od uruchomienia programu client
, aby utworzyć i zaplanować zadania.
go run client/client.go
Spowoduje to utworzenie dwóch zadań: jednego do natychmiastowego przetworzenia oraz drugiego do przetworzenia po 24 godzinach.
Skorzystajmy z interfejsu wiersza poleceń asynq
, aby sprawdzić zadania.
asynq dash
Powinieneś zobaczyć jedno zadanie w stanie Enqueued (Dodane do kolejki) oraz inne zadanie w stanie Scheduled (Zaplanowane).
Uwaga: Aby zrozumieć znaczenie każdego stanu, proszę odnieść się do Cyklu życia zadania.
Na koniec uruchommy program workers
, aby obsłużył zadania.
go run workers/workers.go
Uwaga: Ten program nie zakończy działania, dopóki nie zostanie wysłany sygnał do zakończenia. Aby zapoznać się z praktykami dotyczącymi bezpiecznego zatrzymywania tła workers, proszę odnieść się do strony Wiki Sygnały.
Powinieneś zobaczyć tekstowe wyjście w terminalu, oznaczające pomyślne przetworzenie zadań.
Możesz ponownie uruchomić program client
, aby zobaczyć, jak pracownicy je przyjmują i przetwarzają.
Nie jest rzadkością, że zadanie nie będzie mogło zostać przetworzone pomyślnie za pierwszym razem. Domyślnie nieudane zadania będą ponawiane 25 razy, z wykładniczym odstępem czasu. Zaktualizujmy nasz obsługiwacz, aby zwrócił błąd i zasymulował niepomyślną sytuację.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Próba wysłania wiadomości powitalnej do użytkownika %d...", p.UserID)
return fmt.Errorf("Nie udało się wysłać wiadomości e-mail do użytkownika")
}
Uruchom ponownie program workers i dodaj zadanie do kolejki.
go run workers/workers.go
go run client/client.go
Jeśli uruchamiasz asynq dash
, powinieneś zobaczyć zadanie w stanie Retry (przechodząc do widoku szczegółów kolejki i zaznaczając zakładkę "retry").
Aby sprawdzić, które zadania znajdują się w stanie ponownej próby, możesz również użyć polecenia:
asynq task ls --queue=default --state=retry
Spowoduje to wyświetlenie wszystkich zadań, które będą ponownie próbowane w przyszłości. Wynik będzie zawierał planowany czas kolejnego wykonania dla każdego zadania.
Gdy zadanie wyczerpie wszystkie próby ponownego wykonania, przejdzie ono do stanu Archived i nie będzie ponownie wykonywane (nadawca nadal może ręcznie uruchomić zarchiwizowane zadania stosując narzędzia CLI lub WebUI).
Przed zakończeniem tego samouczka, poprawmy nasz manipulator.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Wysłanie wiadomości powitalnej do użytkownika %d", p.UserID)
return nil
}