Getting Started Guide

In this tutorial, we will write two programs, client and workers.

  • client.go will create and schedule tasks to be processed asynchronously by background worker threads.
  • workers.go will start multiple concurrent worker threads to handle tasks created by the client.

This guide assumes that you are running a Redis server on localhost:6379. Before getting started, please make sure Redis is installed and running.

Let's first create our two main files.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

Then, install the asynq package.

go get -u github.com/hibiken/asynq

Before we start writing the code, let's review some core types that will be used in these two programs.

Redis Connection Options

Asynq uses Redis as a message broker. Both client.go and workers.go need to connect to Redis for read and write operations. We will use RedisClientOpt to specify the connection to the locally running Redis server.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Password can be omitted if not needed
    Password: "mypassword",
    // Use a dedicated database number for asynq.
    // By default, Redis provides 16 databases (0 to 15).
    DB: 0,
}

Tasks

In asynq, work units are encapsulated in a type called Task, which conceptually has two fields: Type and Payload.

// Type is a string value that indicates the type of the task.
func (t *Task) Type() string

// Payload is the data required for task execution.
func (t *Task) Payload() []byte

Now that we have looked at the core types, let's start writing our programs.

Client Program

In client.go, we will create some tasks and enqueue them using asynq.Client.

To create a task, you can use the NewTask function and pass the type and payload of the task.

The Enqueue method takes a task and any number of options. Use the ProcessIn or ProcessAt options to schedule tasks for future processing.

// Payload related to email tasks.
type EmailTaskPayload struct {
    // ID of the email recipient.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // Create a task with type name and payload.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // Process the tasks immediately.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Successfully enqueued the task: %+v", info)

    // Process the tasks after 24 hours.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Successfully enqueued the task: %+v", info)
}

That's all we need for our client program.

Workers Program

In workers.go, we will create an asynq.Server instance to start the workers.

The NewServer function takes RedisConnOpt and Config as parameters.

The Config is used to adjust the server's task processing behavior. You can refer to the Config documentation to learn about all available configuration options.

For simplicity, in this example, we only specify the concurrency.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Note: In the following section, we will introduce what `handler` is.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

The parameter of the (*Server).Run method is an interface asynq.Handler, which has a method ProcessTask.

type Handler interface {
    // If the task is processed successfully, ProcessTask should return nil.
    // If ProcessTask returns a non-nil error or causes a panic, the task will be retried later.
    ProcessTask(context.Context, *Task) error
}

The simplest way to implement a handler is to define a function with the same signature and use the asynq.HandlerFunc adapter type when passing it to Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending welcome email to user %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending reminder email to user %d", p.UserID)

    default:
        return fmt.Errorf("Unexpected task type: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Use the asynq.HandlerFunc adapter to handle the function
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

We can continue to add switch cases for this handler function, but in an actual application, it will be more convenient to define the logic for each case in a separate function.

To refactor our code, let's use ServeMux to create our handler. Just like the ServeMux from the "net/http" package, you can register a handler by calling Handle or HandleFunc. ServeMux satisfies the Handler interface, so it can be passed to (*Server).Run.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

Now that we have extracted the handling functions for each type of task, the code looks more organized. However, the code is still a bit too implicit. We have these string values for task types and payload types, and we should encapsulate them in an organic package. Let's refactor our code and write a package to encapsulate task creation and handling. We simply create a package called task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// List of task types.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
    // ID of the email recipient.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

Now we can import this package in client.go and workers.go.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // Enqueue the task immediately.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Task successfully enqueued: %+v", info)

    // Enqueue the task to be processed after 24 hours.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] Task successfully enqueued: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

The code looks better now!

Running the Program

Now that we have the client and workers ready, we can run these two programs. Let's start by running the client program to create and schedule tasks.

go run client/client.go

This will create two tasks: one for immediate processing and another for processing after 24 hours.

Let's use the asynq command-line interface to inspect the tasks.

asynq dash

You should be able to see one task in the Enqueued state and another task in the Scheduled state.

Note: For understanding the meaning of each state, please refer to the Task Lifecycle.

Finally, let's start the workers program to handle the tasks.

go run workers/workers.go

Note: This program will not exit until you send a signal to terminate it. For best practices on how to safely terminate the background workers, please refer to the Signals Wiki page.

You should be able to see some text output in the terminal, indicating the successful processing of tasks.

You can run the client program again to see how the workers accept and process them.

Task Retry

It is not uncommon for a task to fail to process successfully on the first attempt. By default, failed tasks will be retried 25 times with exponential backoff. Let's update our handler to return an error to simulate an unsuccessful situation.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Attempting to send a welcome email to user %d...", p.UserID)
    return fmt.Errorf("Failed to send email to the user")
}

Let's restart our workers program and enqueue a task.

go run workers/workers.go
go run client/client.go

If you are running asynq dash, you should be able to see a task in the Retry state (by navigating to the queue details view and highlighting the "retry" tab).

To check which tasks are in the retry state, you can also run:

asynq task ls --queue=default --state=retry

This will list all the tasks that will be retried in the future. The output includes the expected time of the next execution for each task.

Once a task has exhausted its retry attempts, it will transition to the Archived state and will not be retried again (you can still manually run archived tasks using CLI or WebUI tools).

Before concluding this tutorial, let's fix our handler.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending a welcome email to user %d", p.UserID)
    return nil 
}

Now that we have fixed the handler, the task will be successfully processed in the next attempt :)