시작 가이드

이 튜토리얼에서는 clientworkers라는 두 개의 프로그램을 작성할 것입니다.

  • client.go는 백그라운드 워커 스레드에 의해 비동기적으로 처리될 작업을 생성하고 예약할 것입니다.
  • workers.go는 클라이언트가 생성한 작업을 처리하기 위해 여러 개의 동시 워커 스레드를 시작할 것입니다.

이 가이드는 localhost:6379에서 Redis 서버가 실행 중이라고 가정합니다. 시작하기 전에 Redis가 설치되어 실행 중인지 확인하십시오.

먼저 두 개의 주 파일을 만들어 봅시다.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

그런 다음, asynq 패키지를 설치합니다.

go get -u github.com/hibiken/asynq

코드를 작성하기 전에 두 프로그램에서 사용될 일부 핵심 유형을 검토해 봅시다.

Redis 연결 옵션

Asynq는 메시지 브로커로 Redis를 사용합니다. client.goworkers.go는 모두 Redis에 연결하여 읽기 및 쓰기 작업을 해야 합니다. 우리는 로컬에서 실행 중인 Redis 서버에 대한 연결을 지정하기 위해 RedisClientOpt을 사용할 것입니다.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // 필요한 경우 비밀번호가 생략될 수 있습니다.
    Password: "mypassword",
    // asynq를 위한 전용 데이터베이스 번호를 사용합니다.
    // 기본적으로 Redis는 16개의 데이터베이스(0부터 15)를 제공합니다.
    DB: 0,
}

작업

asynq에서 작업은 Task라는 유형에 캡슐화되며, 개념적으로 TypePayload라는 두 필드가 있습니다.

// Type는 작업의 유형을 나타내는 문자열 값입니다.
func (t *Task) Type() string

// Payload는 작업 실행에 필요한 데이터입니다.
func (t *Task) Payload() []byte

핵심 유형을 살펴보았으니, 이제 프로그램 작성을 시작해 봅시다.

클라이언트 프로그램

client.go에서는 asynq.Client를 사용하여 일부 작업을 생성하고 그것들을 큐에 넣을 것입니다.

작업을 생성하려면 NewTask 함수를 사용하여 작업의 유형과 페이로드를 전달할 수 있습니다.

Enqueue 메서드는 작업과 옵션의 어떤 수량도 취할 수 있습니다. 작업을 미래 처리를 위해 예약하려면 ProcessIn 또는 ProcessAt 옵션을 사용하세요.

// 이메일 작업과 관련된 페이로드.
type EmailTaskPayload struct {
    // 이메일 수신자의 ID.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // 타입 이름과 페이로드로 작업 생성.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // 작업들을 즉시 처리합니다.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 작업을 성공적으로 큐에 넣었습니다: %+v", info)

    // 24시간 후에 작업을 처리합니다.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 작업을 성공적으로 큐에 넣었습니다: %+v", info)
}

이것으로 클라이언트 프로그램에 필요한 모든 것이 준비되었습니다.

Workers Program

workers.go에서 asynq.Server 인스턴스를 생성하여 workers를 시작합니다.

NewServer 함수는 RedisConnOptConfig를 매개변수로 사용합니다.

Config는 서버의 작업 처리 동작을 조정하는 데 사용됩니다. 모든 사용 가능한 구성 옵션에 대해 알아보려면 Config 문서를 참조할 수 있습니다.

간단히 하기 위해 예제에서는 일관성만 지정합니다.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 참고: 다음 섹션에서 `handler`가 무엇인지 소개할 것입니다.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run 메서드의 매개변수는 asynq.Handler 인터페이스이며, 이는 ProcessTask 메서드를 가지고 있습니다.

type Handler interface {
    // 작업이 성공적으로 처리되면 ProcessTask는 nil을 반환해야 합니다.
    // ProcessTask가 nil이 아닌 오류를 반환하거나 패닉을 유발하면 작업이 나중에 다시 시도됩니다.
    ProcessTask(context.Context, *Task) error
}

핸들러를 구현하는 가장 간단한 방법은 동일한 시그니처를 가진 함수를 정의하고, Run에 전달할 때 asynq.HandlerFunc 어댑터 유형을 사용하는 것입니다.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending welcome email to user %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending reminder email to user %d", p.UserID)

    default:
        return fmt.Errorf("Unexpected task type: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 함수를 처리하기 위해 asynq.HandlerFunc 어댑터를 사용합니다.
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

이 핸들러 함수에 대해 추가적으로 switch 케이스를 계속 추가할 수 있지만, 실제 응용 프로그램에서는 각 케이스에 대한 로직을 별도의 함수로 정의하는 것이 더 편리할 것입니다.

코드를 리팩터링하려면 ServeMux를 사용하여 핸들러를 만드는 것이 좋습니다. "net/http" 패키지의 ServeMux와 마찬가지로 Handle 또는 HandleFunc를 호출하여 핸들러를 등록할 수 있습니다. ServeMuxHandler 인터페이스를 충족하므로 (*Server).Run에 전달할 수 있습니다.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

각 유형의 작업을 처리하기 위해 핸들러 함수를 추출했으므로 코드가 더 구성 정돈되어 보입니다. 그러나 여전히 코드가 너무 암시적으로 작성되었습니다. 작업 유형 및 페이로드 유형을 나타내는 문자열 값을 캡슐화해야 하며, 이를 위해 유기적인 패키지를 작성해야 합니다. 코드를 리팩터링하고 작업 생성 및 처리를 캡슐화하기 위해 패키지를 작성합니다. 간단히 말해서 task라는 패키지를 만들어보겠습니다.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// 작업 유형 목록.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// 이메일과 관련된 모든 작업의 페이로드.
type EmailTaskPayload struct {
    // 이메일 수신자의 ID.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 사용자 %d님에게 환영 이메일을 전송", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 사용자 %d님에게 리마인더 이메일을 전송", p.UserID)
    return nil
}

이제 client.goworkers.go에서 이 패키지를 가져올 수 있습니다.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // 작업을 즉시 대기열에 추가합니다.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 작업이 성공적으로 대기열에 추가되었습니다: %+v", info)

    // 24시간 후에 처리될 작업을 대기열에 추가합니다.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 작업이 성공적으로 대기열에 추가되었습니다: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

코드가 더 깔끔해졌습니다!

이제 clientworkers가 준비되었으므로, 두 프로그램을 실행할 수 있습니다. 먼저 client 프로그램을 실행하여 작업을 생성하고 예약해봅시다.

go run client/client.go

이렇게 하면 즉시 처리될 작업과 24시간 후에 처리될 작업 두 개가 생성됩니다.

asynq 명령줄 인터페이스를 사용하여 작업을 검사해봅시다.

asynq dash

대기 중 상태의 작업 하나와 예약됨 상태의 작업 하나가 보이는지 확인할 수 있어야 합니다.

참고: 각 상태의 의미를 이해하려면 작업 수명주기를 참조하십시오.

마지막으로, workers 프로그램을 실행하여 작업을 처리해봅시다.

go run workers/workers.go

참고: 이 프로그램은 종료 신호를 받을 때까지 종료되지 않습니다. 백그라운드 워커를 안전하게 종료하는 방법에 대한 모범 사례는 시그널 위키 페이지를 참조하십시오.

터미널에 일부 텍스트 출력이 표시되면 작업이 성공적으로 처리된 것입니다.

client 프로그램을 다시 실행하여 워커가 작업을 받아들이고 처리하는 방법을 확인할 수 있습니다.

첫 번째 시도에서 작업이 성공적으로 처리되지 않을 수도 있습니다. 기본적으로 작업이 25회 지수 백오프로 다시 시도됩니다. 성공적으로 처리되지 않은 상황을 시뮬레이션하기 위해 핸들러를 업데이트하여 오류를 반환하도록 해봅시다.

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 사용자 %d에게 환영 이메일을 보내는 중...", p.UserID)
    return fmt.Errorf("사용자에게 이메일을 보내지 못했습니다")
}

작업자 프로그램을 재시작하고 작업을 대기열에 추가합시다.

go run workers/workers.go
go run client/client.go

만약 asynq dash를 실행 중이라면, 대기열 세부 정보 보기로 이동하여 "retry" 탭을 강조하여 Retry 상태의 작업을 확인할 수 있어야 합니다.

재시도 상태의 작업을 확인하려면 다음 명령을 실행할 수도 있습니다.

asynq task ls --queue=default --state=retry

이것은 미래에 재시도 될 모든 작업을 나열합니다. 출력에는 각 작업의 다음 실행 예정 시간도 포함되어 있습니다.

한 작업이 재시도를 모두 소진하면 Archived 상태로 전환되어 더 이상 재시도되지 않습니다(아카이브된 작업은 CLI 또는 WebUI 도구를 사용하여 수동으로 실행할 수 있습니다).

튜토리얼을 마무리하기 전에 핸들러를 수정해 봅시다.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 사용자 %d에게 환영 이메일을 보내는 중", p.UserID)
    return nil 
}