Questa pagina descrive come configurare il ripetitore delle attività.

Comportamento predefinito

Per impostazione predefinita, asynq riproverà un'attività per un massimo di 25 volte. Ogni volta che l'attività viene riprovata, utilizza una strategia di ritardo esponenziale per calcolare il ritardo del ripetitore. Se un'attività esaurisce tutti i tentativi di ripetizione (il default è 25 volte), l'attività verrà spostata allo stato archiviato per scopi di debug e ispezione, e non verrà ripetuta automaticamente (è comunque possibile eseguire manualmente l'attività usando la CLI o WebUI).

Le seguenti proprietà di ripetizione delle attività possono essere personalizzate:

  • Numero massimo di tentativi di ripetizione per ogni attività
  • Intervallo di tempo da attendere prima che un'attività fallita possa essere ripetuta (ovvero, ritardo)
  • Se utilizzare il conteggio di ripetizione dell'attività
  • Se saltare le ripetizioni ed inviare direttamente l'attività all'archivio

Le parti restanti di questa pagina descrivono ciascuna opzione personalizzata menzionata in precedenza.

Personalizzazione del numero massimo di tentativi di ripetizione per le attività

È possibile specificare il numero massimo di tentativi di ripetizione per un'attività durante l'inserimento dell'attività utilizzando l'opzione asynq.MaxRetry.

Esempio:

client.Enqueue(task, asynq.MaxRetry(5))

Questo indica che l'attività dovrebbe essere ripetuta al massimo cinque volte.

In alternativa, se si desidera impostare il numero massimo di tentativi di ripetizione per una particolare attività, è possibile impostarlo come opzione predefinita per l'attività.

task := asynq.NewTask("feed:import", nil, asynq.MaxRetry(5))
client.Enqueue(task) // MaxRetry impostato su 5

Personalizzazione del ritardo di ripetizione

È possibile specificare come calcolare il ritardo di ripetizione utilizzando l'opzione RetryDelayFunc nella struttura Config.

La firma di RetryDelayFunc è la seguente:

// n è il numero di volte che l'attività è stata ripetuta
// e è l'errore restituito dall'elaboratore dell'attività
// t è l'attività correlata
RetryDelayFunc func(n int, e error, t *asynq.Task) time.Duration

Esempio:

srv := asynq.NewServer(redis, asynq.Config{
    Concurrency: 20,
    RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
        return 2 * time.Second
    },
})

Questo indica che tutte le attività fallite aspetteranno due secondi prima di essere processate nuovamente.

Il comportamento predefinito è un ritardo esponenziale, definito da DefaultRetryDelayFunc. L'esempio seguente illustra come personalizzare il ritardo di ripetizione per tipi di attività specifici:

srv := asynq.NewServer(redis, asynq.Config{
   // Per le attività "foo", utilizzare sempre un ritardo di 2 secondi, le altre attività utilizzano il comportamento predefinito.
    RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
        if t.Type() == "foo" {
            return 2 * time.Second 
        }
        return asynq.DefaultRetryDelayFunc(n, e, t) 
    },
})

Errore non di fallimento

A volte si potrebbe desiderare di restituire un errore dall'elaboratore e ripetere l'attività in seguito, ma senza utilizzare il conteggio di ripetizione dell'attività. Ad esempio, si potrebbe voler riprovare in seguito perché l'unità di lavoro non dispone delle risorse sufficienti per gestire l'attività. Durante l'inizializzazione del server, è possibile scegliere di fornire una Config per la funzione IsFailure(error) bool. Questa funzione predicativa determina se l'errore restituito dall'elaboratore conta come un fallimento. Se la funzione restituisce false (ovvero un errore non di fallimento), il server non utilizzerà il conteggio di ripetizione dell'attività e pianificherà semplicemente l'attività per un successivo ripetizione.

Esempio:

var ErrResourceNotAvailable = errors.New("non ci sono risorse disponibili")

func HandleResourceIntensiveTask(ctx context.Context, task *asynq.Task) error {
    if !IsResourceAvailable() {
        return ErrResourceNotAvailable
    }
    // ... logica per gestire l'attività intensiva in risorse
}

// ...

srv := asynq.NewServer(redisConnOpt, asynq.Config{
    // ... altre opzioni di configurazione
    IsFailure: func(err error) bool {
        return err != ErrResourceNotAvailable // errore non di fallimento se non ci sono risorse disponibili
    },
})

Saltare il ripetitore

Se Handler.ProcessTask restituisce un errore SkipRetry, l'attività verrà archiviata indipendentemente dal conteggio di ripetizione rimanente. L'errore restituito può essere SkipRetry o un errore racchiuso con un errore SkipRetry.

func ExampleHandler(ctx context.Context, task *asynq.Task) error {
    // Logica di gestione dell'attività qui...
    // Se l'elaboratore sa che l'attività non dovrebbe essere ripetuta, restituire SkipRetry
    return fmt.Errorf(": %w", asynq.SkipRetry)
}