Ta strona przedstawia, jak korzystać z klastra Redis jako brokera wiadomości w Asynq.

Zalety korzystania z klastra Redis

Korzystając z klastra Redis, można uzyskać następujące korzyści:

  • Łatwe rozmieszczenie danych na wielu węzłach Redis
  • Utrzymanie dostępności w przypadku awarii określonych węzłów
  • Automatyczne wykonywanie awaryjnego przejęcia

Przegląd

Ilustracja kolejki klastra

Asynq rozdziela dane na podstawie kolejek. Na powyższym diagramie mamy klaster Redis z 6 instancjami (3 masterami i 3 slave'ami) oraz 4 kolejkami (q1, q2, q3, q4).

  • Master1 (oraz jego replika, Slave1) obsługuje q1 i q2.
  • Master2 (oraz jego replika, Slave2) obsługuje q3.
  • Master3 (oraz jego replika, Slave3) obsługuje q4.

Kiedy korzystasz z asynq.Client do umieszczania zadań w kolejce, możesz określić kolejkę za pomocą opcji Queue. Umieszczone zadania będą pobierane przez asynq.Server(y) z tych kolejek.

Samouczek

W tej sekcji przedstawimy, jak używać klastra Redis jako brokera wiadomości dla Asynq. Zakładamy, że masz klaster 6 instancji Redis uruchomiony na portach 7000-7005. Poniżej znajduje się przykład pliku redis.conf:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

Następnie utworzymy dwie pliki binarne: klient i pracownik.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

W client.go utworzymy nowego klienta asynq.Client i określimy, jak połączyć się z klastrem Redis, przekazując RedisClusterClientOpt.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Gdy już mamy klienta, utworzymy zadania i umieścimy je w trzech różnych kolejkach:

  • powiadomienia
  • webhooks
  • obrazy
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Nazwy kolejek
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // Utwórz zadanie "notifications:email" i umieść je w kolejce "notifications".
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)

    // Utwórz zadanie "webhooks:sync" i umieść je w kolejce "webhooks".
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)

    // Utwórz zadanie "images:resize" i umieść je w kolejce "images".
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Pomyślnie umieszczono w kolejce: %+v\n", res)
}

Uruchommy ten program, aby umieścić trzy zadania w kolejkach.

go run client/client.go

Teraz przejdźmy do pracownika, który przetworzy te trzy zadania. W worker.go utworzymy asynq.Server, aby pobierać zadania z tych trzech kolejek. Podobnie, połączmy się z naszym klastrem Redis, używając RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Tutaj ustawiamy tę samą priorytet dla każej kolejki
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", obsłużZadanieEmail)
	mux.HandleFunc("webhooks:sync", obsłużZadanieSynchronizacjiWebhook)
	mux.HandleFunc("images:resize", obsłużZmianęRozmiaruObrazu)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Nie udało się uruchomić serwera: %v", err)
	}
}

func obsłużZadanieEmail(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("Wysyłanie e-maila od %d do %d\n", from, to)
	return nil
}

func obsłużZadanieSynchronizacjiWebhook(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Obsługa zadania Webhook: %d\n", data)
	return nil
}

func obsłużZmianęRozmiaruObrazu(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Zmiana rozmiaru obrazu: %s\n", src)
	return nil
}

Uruchommy ten serwer w celu obsłużenia trzech zadań, które wcześniej utworzyliśmy.

go run worker/worker.go

Powinieneś zobaczyć wiadomości wydrukowane z każdego handlera.

Zgodnie z opisem ogólnym, Asynq dzieli dane na podstawie kolejek. Wszystkie zadania dodane do tej samej kolejki należą do tego samego węzła Redis. Więc który węzeł Redis przechowuje których kolejkę?

Możemy użyć interfejsu wiersza poleceń, aby odpowiedzieć na to pytanie.

asynq queue ls --cluster

To polecenie wydrukuje listę kolejek wraz z:

  • Węzłem klastra, do którego kolejka należy
  • Slotem haszującym klastra, do którego kolejka jest mapowana

Wyjście może wyglądać tak:

Kolejka         Klucz slotu klastra   Węzły klastra
-----           ---------------       -------------
images          9450                  [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks        4418                  [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications   16340                 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]