La funzione di attività unica in Asynq garantisce che ci sia solo un'attività nella coda Redis.
Quando si desidera eliminare le attività duplicate e evitare attività duplicate, questa funzione è molto utile.
Panoramica
Ci sono due metodi per garantire l'unicità dell'attività in Asynq.
- Utilizzando l'opzione
TaskID
: Generare un ID univoco dell'attività da soli. - Utilizzando l'opzione
Unique
: Fare in modo che Asynq crei un blocco di unicità per l'attività.
Utilizzando l'opzione TaskID
Se si sceglie il primo metodo, è possibile garantire che ci sia solo un'attività con un determinato ID attività in un dato momento. Se si tenta di inserire un'altra attività con lo stesso ID attività, restituirà un errore ErrTaskIDConflict
.
// La prima attività dovrebbe andare bene
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// La seconda attività fallirà, err sarà ErrTaskIDConflict (assumendo che la prima attività non sia ancora stata processata)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
Utilizzando l'opzione Unique
Il secondo metodo si basa sul blocco di unicità. Quando si accoda un'attività utilizzando l'opzione Unique
, il Client
verificherà se può acquisire il blocco per l'attività specificata. L'attività verrà accodata solo se il blocco può essere acquisito. Se un'altra attività detiene già il blocco, il Client
restituirà un errore (vedi il codice di esempio sotto su come verificare gli errori).
Il blocco di unicità è associato a un TTL (time to live) per evitare di detenere il blocco in modo permanente. Il blocco verrà rilasciato dopo il TTL o dopo che l'attività è stata elaborata con successo prima del TTL.
Una cosa importante da notare è che la funzione di attività unica in Asynq è unicità di massimo sforzo. In altre parole, se il blocco è scaduto prima che l'attività sia stata elaborata, potrebbe essere accodata un'attività duplicata.
L'unicità dell'attività si basa sui seguenti attributi:
- Tipo
- Payload
- Coda
Pertanto, se ci sono attività con lo stesso tipo e payload accodate alla stessa coda, un'altra attività con gli stessi attributi non verrà accodata fino a quando il blocco non viene rilasciato.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("esempio", []byte("ciao"))
// t1 deterrà il blocco di unicità per la prossima ora.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Gestire attività duplicate
case err != nil:
// Gestire altri errori
}
t2 := asynq.NewTask("esempio", []byte("ciao"))
// t2 non può essere accodato in quanto è un duplicato di t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Gestire attività duplicate
case err != nil:
// Gestire altri errori
}
Nell'esempio precedente, t2
non sarà accodato in quanto è un duplicato di t1
. È possibile utilizzare errors.Is
per verificare il valore error
restituito e determinare se racchiude l'errore asynq.ErrDuplicateTask
.