1. Rola mechanizmów synchronizacji
W programowaniu współbieżnym, gdy wiele goroutine dzieli zasoby, konieczne jest zapewnienie, że zasoby mogą być dostępne tylko dla jednej goroutine w danym czasie, aby zapobiec warunkom wyścigowym. Wymaga to użycia mechanizmów synchronizacji. Mechanizmy synchronizacji mogą koordynować kolejność dostępu różnych goroutine do współdzielonych zasobów, zapewniając spójność danych i synchronizację stanu w środowisku współbieżnym.
Język Go zapewnia bogaty zestaw mechanizmów synchronizacji, w tym między innymi:
- Mutexy (sync.Mutex) i mutexy do odczytu i zapisu (sync.RWMutex)
- Kanały
- Grupy oczekiwania (WaitGroups)
- Funkcje atomowe (pakiet atomic)
- Zmienne warunkowe (sync.Cond)
2. Podstawy synchronizacji
2.1 Mutex (sync.Mutex)
2.1.1 Pojęcie i rola mutexa
Mutex to mechanizm synchronizacji, który zapewnia bezpieczne operacje na współdzielonych zasobach, pozwalając tylko jednej goroutine na blokowanie dostępu do współdzielonego zasobu w danym momencie. Mutex osiąga synchronizację poprzez metody Lock
i Unlock
. Wywołanie metody Lock
zablokuje do momentu zwolnienia blokady, w którym to punkcie inne gorutyny próbujące uzyskać blokadę będą czekać. Wywołanie Unlock
zwalnia blokadę, pozwalając innym oczekującym gorutynom ją uzyskać.
var mu sync.Mutex
func sekcjaKrytyczna() {
// Blokuj dostęp, aby wyłącznie uzyskać dostęp do zasobu
mu.Lock()
// Tutaj uzyskaj dostęp do współdzielonego zasobu
// ...
// Zwolnij blokadę, aby inne gorutyny mogły ją uzyskać
mu.Unlock()
}
2.1.2 Praktyczne użycie mutexa
Załóżmy, że musimy utrzymać globalny licznik, a kilka gorutyn musi zwiększać jego wartość. Użycie mutexa może zapewnić dokładność licznika.
var (
mu sync.Mutex
licznik int
)
func inkrementacja() {
mu.Lock() // Zablokuj przed modyfikacją licznika
licznik++ // Bezpiecznie zwiększaj licznik
mu.Unlock() // Odblokuj po operacji, pozwalając innym gorutynom na dostęp do licznika
}
func main() {
for i := 0; i < 10; i++ {
go inkrementacja() // Uruchom kilka gorutyn do zwiększania wartości licznika
}
// Poczekaj chwilę (w praktyce powinieneś użyć WaitGroup lub innych metod, aby poczekać, aż wszystkie gorutyny zostaną zakończone)
time.Sleep(1 * time.Second)
fmt.Println(licznik) // Wyświetl wartość licznika
}
2.2 Mutex do odczytu i zapisu (sync.RWMutex)
2.2.1 Pojęcie mutexu do odczytu i zapisu
RWMutex to specjalny rodzaj blokady, który pozwala wielu gorutynom na równoczesne odczytanie wspólnych zasobów, podczas gdy operacje zapisu są wyłączne. W porównaniu do mutexów, blokady do odczytu i zapisu mogą poprawić wydajność w przypadku wielu operacji odczytu. Posiada cztery metody: RLock
, RUnlock
do blokowania i odblokowywania operacji odczytu oraz Lock
, Unlock
do blokowania i odblokowywania operacji zapisu.
2.2.2 Praktyczne zastosowania mutexu do odczytu i zapisu
W aplikacji bazy danych operacje odczytu mogą być znacznie częstsze niż operacje zapisu. Użycie blokady do odczytu i zapisu może poprawić wydajność systemu, ponieważ pozwala wielu gorutynom na równoczesne odczytywanie.
var (
rwMu sync.RWMutex
dane int
)
func odczytDanych() int {
rwMu.RLock() // Uzyskaj blokadę do odczytu, pozwalając innym operacjom odczytu na równoległe wykonanie
defer rwMu.RUnlock() // Upewnij się, że blokada zostanie zwolniona przy użyciu defer
return dane // Bezpieczne odczytanie danych
}
func zapisDanych(nowaWartosc int) {
rwMu.Lock() // Uzyskaj blokadę do zapisu, uniemożliwiając inne operacje odczytu lub zapisu w tym czasie
dane = nowaWartosc // Bezpieczne zapisz nową wartość
rwMu.Unlock() // Odblokuj po zakończeniu zapisu
}
func main() {
go zapisDanych(42) // Uruchom gorutynę do wykonania operacji zapisu
fmt.Println(odczytDanych()) // Główna gorutyna wykonuje operację odczytu
// Użyj WaitGroup lub innych metod synchronizacji, aby upewnić się, że wszystkie gorutyny zostaną zakończone
}
W powyższym przykładzie, wielu czytelników może jednocześnie wykonywać funkcję odczytDanych
, ale pisarz wykonujący zapisDanych
zablokuje nowych czytelników i inne pisarze. Mechanizm ten zapewnia korzyści wydajnościowe dla scenariuszy z większą liczbą odczytów niż zapisów.
2.3 Zmienne warunkowe (sync.Cond
)
2.3.1 Pojęcie Zmiennych Warunkowych
W mechanizmie synchronizacji języka Go, zmienne warunkowe są używane do oczekiwania lub powiadamiania o zmianach warunków jako prymityw synchronizacji. Zmienne warunkowe są zawsze używane w połączeniu z muteksem (sync.Mutex
), który chroni spójność samego warunku.
Pojęcie zmiennych warunkowych pochodzi z dziedziny systemów operacyjnych, umożliwiając grupie goroutin oczekiwanie na spełnienie określonego warunku. Konkretnie, goroutyna może wstrzymać wykonanie w oczekiwaniu na spełnienie warunku, a inna goroutyna może poinformować inne gorutyny o wznowieniu wykonania po zmianie warunku, korzystając z zmiennej warunkowej.
W standardowej bibliotece języka Go, zmienne warunkowe są dostarczane za pomocą typu sync.Cond
, a ich główne metody obejmują:
-
Wait
: Wywołanie tej metody zwolni blokadę i zablokuje do momentu, gdy inna goroutyna wywołaSignal
lubBroadcast
na tej samej zmiennej warunkowej, aby ją obudzić, po czym spróbuje ponownie uzyskać blokadę. -
Signal
: Budzi jedną gorutynę oczekującą na tę zmienną warunkową. Jeśli żadna gorutyna nie oczekuje, wywołanie tej metody nie ma żadnego efektu. -
Broadcast
: Budzi wszystkie gorutyny oczekujące na tę zmienną warunkową.
Zmienne warunkowe nie powinny być kopiowane, dlatego zazwyczaj są używane jako pole wskaźnika pewnej struktury.
2.3.2 Praktyczne Przykłady Użycia Zmiennych Warunkowych
Oto przykład użycia zmiennych warunkowych demonstrujący prosty model producent-konsument:
package main
import (
"fmt"
"sync"
"time"
)
// SafeQueue to bezpieczna kolejka chroniona przez muteks
type SafeQueue struct {
mu sync.Mutex
cond *sync.Cond
queue []interface{}
}
// Enqueue dodaje element na koniec kolejki i powiadamia oczekujące gorutyny
func (sq *SafeQueue) Enqueue(item interface{}) {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.queue = append(sq.queue, item)
sq.cond.Signal() // Powiadamia oczekujące gorutyny, że kolejka nie jest pusta
}
// Dequeue usuwa element z głowy kolejki, czeka, jeśli kolejka jest pusta
func (sq *SafeQueue) Dequeue() interface{} {
sq.mu.Lock()
defer sq.mu.Unlock()
// Czekaj, gdy kolejka jest pusta
for len(sq.queue) == 0 {
sq.cond.Wait() // Czekaj na zmianę warunku
}
item := sq.queue[0]
sq.queue = sq.queue[1:]
return item
}
func main() {
queue := make([]interface{}, 0)
sq := SafeQueue{
mu: sync.Mutex{},
cond: sync.NewCond(&sync.Mutex{}),
queue: queue,
}
// Gorutyna Producent
go func() {
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second) // Symuluj czas produkcji
sq.Enqueue(fmt.Sprintf("element%d", i)) // Produkuj element
fmt.Println("Produkt:", i)
}
}()
// Gorutyna Konsument
go func() {
for i := 0; i < 5; i++ {
item := sq.Dequeue() // Konsumuj element, czekaj, jeśli kolejka jest pusta
fmt.Printf("Konsumuj: %v\n", item)
}
}()
// Poczekaj przez wystarczający czas, aby upewnić się, że cała produkcja i konsumpcja są ukończone
time.Sleep(10 * time.Second)
}
W tym przykładzie zdefiniowaliśmy strukturę SafeQueue
z wewnętrzną kolejką i zmienną warunkową. Gdy konsument wywołuje metodę Dequeue
i kolejka jest pusta, czeka za pomocą metody Wait
. Gdy producent wywołuje metodę Enqueue
w celu dodania nowego elementu, używa metody Signal
, aby obudzić oczekującego konsumenta.
2.4 WaitGroup
2.4.1 Pojęcie i Użycie WaitGroup
sync.WaitGroup
to mechanizm synchronizacji używany do oczekiwania na zakończenie grupy gorutyn. Gdy uruchamiasz gorutynę, możesz zwiększyć licznik, wywołując metodę Add
, a każda gorutyna może wywołać metodę Done
(która faktycznie wykonuje Add(-1)
) po zakończeniu swojego zadania. Główna gorutyna może zablokować się, wywołując metodę Wait
, aż licznik osiągnie wartość 0, co oznacza, że wszystkie gorutyny zakończyły swoje zadania.
Podczas korzystania z WaitGroup
należy zwrócić uwagę na następujące kwestie:
- Metody
Add
,Done
iWait
nie są bezpieczne wątkowo i nie powinny być wywoływane równocześnie w wielu gorutynach. - Metodę
Add
należy wywołać przed rozpoczęciem nowo utworzonej gorutyny.
2.4.2 Praktyczne zastosowania WaitGroup
Poniżej znajduje się przykład użycia WaitGroup
:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Powiadomienie WaitGroup po zakończeniu
fmt.Printf("Rozpoczęcie pracy przez pracownika %d\n", id)
time.Sleep(time.Second) // Symulacja długotrwałej operacji
fmt.Printf("Pracownik %d zakończył pracę\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // Inkrementacja licznika przed uruchomieniem goroutine
go worker(i, &wg)
}
wg.Wait() // Oczekiwanie, aż wszystkie gorutyny pracownicze zakończą pracę
fmt.Println("Wszyscy pracownicy zakończyli pracę")
}
W tym przykładzie funkcja worker
symuluje wykonywanie zadania. W funkcji main
uruchamiamy pięć gorutyn worker
. Przed uruchomieniem każdej gorutyny wywołujemy wg.Add(1)
to powiadamia WaitGroup
, że rozpoczyna się nowe zadanie. Gdy każda funkcja pracownika zostanie ukończona, wywołuje defer wg.Done()
by poinformować WaitGroup
, że zadanie jest zakończone. Po uruchomieniu wszystkich gorutyn, funkcja main
blokuje się na wg.Wait()
, aż wszystkie pracowniki zgłoszą zakończenie.
2.5 Operacje Atomowe (sync/atomic
)
2.5.1 Pojęcie Operacji Atomowych
Operacje atomowe odnoszą się do operacji w programowaniu współbieżnym, które są niepodzielne, co oznacza, że nie są przerywane przez inne operacje podczas wykonywania. Dla wielu gorutyn, użycie operacji atomowych może zapewnić spójność danych i synchronizację stanu bez konieczności blokowania, ponieważ same operacje atomowe gwarantują atomowość wykonania.
W języku Go pakiet sync/atomic
dostarcza operacji atomowej pamięci na niskim poziomie. Dla podstawowych typów danych takich jak int32
, int64
, uint32
, uint64
, uintptr
i pointer
, metody z pakietu sync/atomic
można używać do bezpiecznych operacji współbieżnych. Istotą operacji atomowych jest stanowienie podstawy dla budowy innych prymitywów współbieżnych (takich jak blokady i zmienne warunkowe) oraz często większa wydajność w porównaniu z mechanizmami blokowania.
2.5.2 Praktyczne zastosowania operacji atomowych
Rozważmy scenariusz, w którym musimy śledzić jednoczesną liczbę odwiedzających stronę internetową. Używając prostej zmiennej licznikowej, zwiększalibyśmy licznik, gdy odwiedzający przybywa, i zmniejszalibyśmy go, gdy odwiedzający odchodzi. Jednak w środowisku współbieżnym ten podejście prowadziłoby do wyścigów danych (data races). Dlatego możemy użyć pakietu sync/atomic
, aby bezpiecznie manipulować licznikiem.
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var liczbaOdwiedzających int32
func zwiększLiczbeOdwiedzających() {
atomic.AddInt32(&liczbaOdwiedzających, 1)
}
func zmniejszLiczbeOdwiedzających() {
atomic.AddInt32(&liczbaOdwiedzających, -1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
zwiększLiczbeOdwiedzających()
time.Sleep(time.Second) // Czas wizyty odwiedzającego
zmniejszLiczbeOdwiedzających()
wg.Done()
}()
}
wg.Wait()
fmt.Printf("Aktualna liczba odwiedzających: %d\n", liczbaOdwiedzających)
}
W tym przykładzie tworzymy 100 gorutyn, aby symulować przybycie i odjazd odwiedzających. Używając funkcji atomic.AddInt32()
, zapewniamy, że inkrementacje i dekrementacje licznika są atomowe, nawet w silnie współbieżnych sytuacjach, co zapewnia dokładność liczbaOdwiedzających
.
2.6 Mechanizm synchronizacji kanału
2.6.1 Charakterystyka synchronizacji kanału
Kanały są sposobem komunikacji gorutyn w języku Go na poziomie języka. Kanał umożliwia przesyłanie i odbieranie danych. Gdy gorutyna próbuje odczytać dane z kanału, a kanał nie ma danych, zablokuje się do momentu dostępności danych. Podobnie, jeśli kanał jest pełny (dla niebuforowanego kanału oznacza to, że już ma dane), gorutyna próbująca przesłać dane również zablokuje się do momentu uzyskania miejsca do zapisu. Ta funkcja czyni kanały bardzo przydatnymi do synchronizacji między gorutynami.
2.6.2 Przypadki użycia synchronizacji kanałów
Załóżmy, że mamy zadanie, które musi być wykonane przez wiele gorutyn, z których każda obsługuje podzadanie, a następnie musimy zebrać wyniki wszystkich podzadań. Możemy użyć kanału do oczekiwania na zakończenie wszystkich gorutyn.
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
// Wykonaj pewne operacje...
fmt.Printf("Rozpoczęcie pracy przez pracownika %d\n", id)
// Załóżmy, że wynik podzadania to identyfikator pracownika
resultChan <- id
fmt.Printf("Pracownik %d zakończył pracę\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 5
resultChan := make(chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
// Zebranie wszystkich wyników
for result := range resultChan {
fmt.Printf("Odebrany wynik: %d\n", result)
}
}
W tym przykładzie uruchamiamy 5 gorutyn do wykonania zadań i zbieramy wyniki za pomocą kanału resultChan
. Główna gorutyna oczekuje na zakończenie wszystkich zadań w oddzielnej gorutynie, a następnie zamyka kanał wynikowy. Następnie główna gorutyna przechodzi przez kanał resultChan
, zbierając i drukując wyniki wszystkich gorutyn.
2.7 Wykonywanie jednorazowe (sync.Once
)
sync.Once
jest prymitywem synchronizacji, który zapewnia, że operacja jest wykonywana tylko raz podczas wykonywania programu. Typowe zastosowanie sync.Once
to inicjalizacja obiektu singleton lub scenariusze wymagające opóźnionej inicjalizacji. Bez względu na to, ile gorutyn wywołuje tę operację, zostanie ona wykonana tylko raz, stąd nazwa funkcji Do
.
sync.Once
doskonale łączy problemy współbieżności i wydajność wykonania, eliminując obawy o problemy wydajności spowodowane powtórzoną inicjalizacją.
Jako prosty przykład demonstrujący użycie sync.Once
:
package main
import (
"fmt"
"sync"
)
var once sync.Once
var instance *Singleton
type Singleton struct{}
func Instance() *Singleton {
once.Do(func() {
fmt.Println("Tworzenie pojedynczej instancji teraz.")
instance = &Singleton{}
})
return instance
}
func main() {
for i := 0; i < 10; i++ {
go Instance()
}
fmt.Scanln() // Poczekaj, aby zobaczyć wynik
}
W tym przykładzie, nawet jeśli funkcja Instance
jest wywoływana współbieżnie wielokrotnie, utworzenie instancji Singleton
nastąpi tylko raz. Kolejne wywołania będą bezpośrednio zwracać instancję singletona utworzoną po raz pierwszy, zapewniając unikalność instancji.
2.8 ErrGroup
ErrGroup
to biblioteka w języku Go używana do synchronizacji wielu gorutyn i zbierania ich błędów. Jest częścią pakietu "golang.org/x/sync/errgroup", zapewniając zwięzły sposób obsługi scenariuszy błędów w operacjach współbieżnych.
2.8.1 Koncepcja ErrGroup
Głównym pomysłem ErrGroup
jest powiązanie grupy powiązanych zadań (zazwyczaj wykonywanych równolegle) razem, i jeśli jedno z zadań zawiedzie, wykonanie całej grupy zostanie anulowane. Jednocześnie, jeśli którykolwiek z tych operacji równoległych zwraca błąd, ErrGroup
przechwyci i zwróci ten błąd.
Aby użyć ErrGroup
, najpierw zaimportuj pakiet:
import "golang.org/x/sync/errgroup"
Następnie utwórz instancję ErrGroup
:
var g errgroup.Group
Następnie przekaż zadania do ErrGroup
w postaci domknięć i uruchom nową gorutynę, wywołując metodę Go
:
g.Go(func() error {
// Wykonaj określone zadanie
// Jeśli wszystko pójdzie dobrze
return nil
// Jeśli wystąpi błąd
// return fmt.Errorf("wystąpił błąd")
})
W końcu, wywołaj metodę Wait
, która zablokuje i poczeka, aż wszystkie zadania zostaną zakończone. Jeśli któreś z tych zadań zwróci błąd, Wait
zwróci ten błąd:
if err := g.Wait(); err != nil {
// Obsłuż błąd
log.Fatalf("Błąd wykonania zadania: %v", err)
}
2.8.2 Praktyczny Przykład użycia ErrGroup
Rozważmy scenariusz, w którym równocześnie potrzebujemy pobrać dane z trzech różnych źródeł danych, i jeśli któreś z tych źródeł danych zawiedzie, chcemy natychmiast przerwać pozostałe operacje pobierania danych. To zadanie można łatwo zrealizować za pomocą ErrGroup
:
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
)
func fetchDataFromSource1() error {
// Symulacja pobierania danych z źródła 1
return nil // lub zwrócenie błędu w celu zasymulowania awarii
}
func fetchDataFromSource2() error {
// Symulacja pobierania danych z źródła 2
return nil // lub zwrócenie błędu w celu zasymulowania awarii
}
func fetchDataFromSource3() error {
// Symulacja pobierania danych z źródła 3
return nil // lub zwrócenie błędu w celu zasyumowania awarii
}
func main() {
var g errgroup.Group
g.Go(fetchDataFromSource1)
g.Go(fetchDataFromSource2)
g.Go(fetchDataFromSource3)
// Oczekiwanie na zakończenie wszystkich gorutyn i zebranie ich błędów
if err := g.Wait(); err != nil {
fmt.Printf("Wystąpił błąd podczas pobierania danych: %v\n", err)
return
}
fmt.Println("Wszystkie dane zostały pomyślnie pobrane!")
}
W tym przykładzie funkcje fetchDataFromSource1
, fetchDataFromSource2
i fetchDataFromSource3
symulują pobieranie danych z różnych źródeł. Są one przekazywane do metody g.Go
i wykonane w osobnych gorutynach. Jeśli którakolwiek z funkcji zwróci błąd, g.Wait
natychmiast zwróci ten błąd, umożliwiając odpowiednie zarządzanie błędami w momencie wystąpienia błędu. Jeśli wszystkie funkcje zostaną pomyślnie wykonane, g.Wait
zwróci nil
, co oznacza, że wszystkie zadania zostały pomyślnie zakończone.
Inną ważną cechą ErrGroup
jest to, że jeśli którykolwiek z gorutyn rzuci wyjątek (panikę), zostanie ona próbowana przechwycić i zwrócona jako błąd. Pomaga to zapobiec innym równocześnie działającym gorutynom w niewłaściwym zakończeniu. Oczywiście, jeśli chcesz, aby zadania reagowały na zewnętrzne sygnały anulowania, możesz połączyć funkcję WithContext
z pakietem context, aby dostarczyć context możliwy do anulowania.
W ten sposób ErrGroup
staje się bardzo praktycznym mechanizmem synchronizacji i obsługi błędów w praktyce programowania współbieżnego w Go.