1. Vai trò của cơ chế đồng bộ hóa
Trong lập trình đồng thời, khi nhiều goroutines chia sẻ tài nguyên, cần đảm bảo rằng tài nguyên chỉ có thể được truy cập bởi một goroutine vào một thời điểm để ngăn chặn các điều kiện cạnh tranh. Điều này đòi hỏi sử dụng các cơ chế đồng bộ hóa. Các cơ chế đồng bộ hóa có thể phối hợp thứ tự truy cập của các goroutines khác nhau đến tài nguyên chia sẻ, đảm bảo tính nhất quán dữ liệu và đồng bộ trạng thái trong môi trường đồng thời.
Ngôn ngữ Go cung cấp một bộ cơ chế đồng bộ hóa phong phú, bao gồm nhưng không giới hạn ở:
- Khóa định (
sync.Mutex
) và khóa đọc-giữ (sync.RWMutex
) - Kênh (
Channels
) - Nhóm đợi (
WaitGroups
) - Hàm nguyên tử (
atomic
package) - Biến điều kiện (
sync.Cond
)
2. Nguyên lý đồng bộ hóa
2.1 Khóa định (sync.Mutex
)
2.1.1 Khái niệm và vai trò của khóa định
Khóa định là một cơ chế đồng bộ hóa đảm bảo hoạt động an toàn của tài nguyên chia sẻ bằng cách chỉ cho phép một goroutine giữ khóa để truy cập tài nguyên chia sẻ vào bất kỳ thời điểm nào. Khóa định đạt được đồng bộ hóa thông qua các phương thức Lock
và Unlock
. Gọi phương thức Lock
sẽ chặn cho đến khi khóa được giải phóng và tại thời điểm này, các goroutines khác cố gắng giành khóa sẽ chờ đợi. Gọi Unlock
giải phóng khóa, cho phép các goroutines khác đang chờ giành nó.
var mu sync.Mutex
func criticalSection() {
// Giành khóa để truy cập tài nguyên một cách độc quyền
mu.Lock()
// Truy cập tài nguyên chia sẻ tại đây
// ...
// Giải phóng khóa để cho phép các goroutines khác giành lấy nó
mu.Unlock()
}
2.1.2 Ứng dụng thực tế của khóa định
Giả sử chúng ta cần duy trì một bộ đếm toàn cục và nhiều goroutines cần tăng giá trị của nó. Sử dụng khóa định có thể đảm bảo sự chính xác của bộ đếm.
var (
mu sync.Mutex
counter int
)
func increment() {
mu.Lock() // Giữ khóa trước khi sửa đổi bộ đếm
counter++ // Tăng bộ đếm một cách an toàn
mu.Unlock() // Giải phóng sau khi thực hiện, cho phép các goroutines khác truy cập bộ đếm
}
func main() {
for i := 0; i < 10; i++ {
go increment() // Bắt đầu nhiều goroutines để tăng giá trị của bộ đếm
}
// Chờ một khoảng thời gian (trong thực tế, bạn nên sử dụng WaitGroup hoặc các phương pháp khác để chờ tất cả goroutines hoàn thành)
time.Sleep(1 * time.Second)
fmt.Println(counter) // Output giá trị của bộ đếm
}
2.2 Khóa đọc-giữ (sync.RWMutex
)
2.2.1 Khái niệm của khóa đọc-giữ
RWMutex là một loại khóa đặc biệt cho phép nhiều goroutines đọc tài nguyên chia sẻ đồng thời, trong khi các hoạt động ghi là độc quyền. So với khóa định, khóa đọc-giữ có thể cải thiện hiệu suất trong các kịch bản với nhiều người đọc. Nó có bốn phương thức: RLock
, RUnlock
để khóa và mở khóa các thao tác đọc, và Lock
, Unlock
để khóa và mở khóa các thao tác ghi.
2.2.2 Các trường hợp sử dụng thực tế của Khóa đọc-giữ
Trong ứng dụng cơ sở dữ liệu, các thao tác đọc có thể phổ biến hơn nhiều so với các thao tác ghi. Sử dụng khóa đọc-giữ có thể cải thiện hiệu suất hệ thống vì nó cho phép nhiều goroutines đọc đồng thời.
var (
rwMu sync.RWMutex
data int
)
func readData() int {
rwMu.RLock() // Giành khóa đọc, cho phép các thao tác đọc khác tiến hành đồng thời
defer rwMu.RUnlock() // Đảm bảo rằng khóa được giải phóng sử dụng defer
return data // Đọc dữ liệu một cách an toàn
}
func writeData(newValue int) {
rwMu.Lock() // Giành khóa ghi, ngăn chặn các thao tác đọc hoặc ghi khác vào thời điểm này
data = newValue // Ghi giá trị mới một cách an toàn
rwMu.Unlock() // Mở khóa sau khi việc ghi hoàn thành
}
func main() {
go writeData(42) // Bắt đầu một goroutine để thực hiện một thao tác ghi
fmt.Println(readData()) // Goroutine chính thực hiện một thao tác đọc
// Sử dụng WaitGroup hoặc các phương pháp đồng bộ hóa khác để đảm bảo rằng tất cả goroutines đã hoàn thành
}
Trong ví dụ trên, nhiều người đọc có thể thực hiện hàm readData
đồng thời, nhưng một người ghi thực hiện writeData
sẽ chặn lại các người đọc mới và người ghi khác. Cơ chế này cung cấp lợi ích về hiệu suất cho các kịch bản với nhiều thao tác đọc hơn thao tác ghi.
2.3 Biến điều kiện (sync.Cond
)
2.3.1 Khái niệm về Biến Điều Kiện
Trong cơ chế đồng bộ hóa ngôn ngữ Go, biến điều kiện được sử dụng để chờ đợi hoặc thông báo về sự thay đổi của một điều kiện nhất định như một nguyên tắc đồng bộ hóa. Biến điều kiện luôn được sử dụng cùng với một khóa mutex (sync.Mutex
), được sử dụng để bảo vệ tính nhất quán của điều kiện đó.
Khái niệm về biến điều kiện bắt nguồn từ lĩnh vực hệ điều hành, cho phép một nhóm các goroutines chờ đợi một điều kiện nhất định được đáp ứng. Cụ thể hơn, một goroutine có thể tạm dừng khi chờ đợi cho một điều kiện được đáp ứng, và một goroutine khác có thể thông báo cho các goroutine khác để tiếp tục thực thi sau khi thay đổi điều kiện bằng cách sử dụng biến điều kiện.
Trong thư viện chuẩn Go, biến điều kiện được cung cấp qua kiểu sync.Cond
, và các phương thức chính bao gồm:
-
Wait
: Gọi phương thức này sẽ giải phóng khóa đang giữ và chặn cho đến khi một goroutine khác gọiSignal
hoặcBroadcast
trên cùng một biến điều kiện để thức dậy nó, sau đó sẽ cố gắng một lần nữa để có được khóa. -
Signal
: Thức dậy một goroutine đang chờ đợi biến điều kiện này. Nếu không có goroutine nào đang chờ đợi, việc gọi phương thức này sẽ không có tác dụng. -
Broadcast
: Thức dậy tất cả các goroutine đang chờ đợi biến điều kiện này.
Biến điều kiện không nên được sao chép, vì vậy chúng thường được sử dụng như một trường con trỏ của một cấu trúc cụ thể.
2.3.2 Các Trường Hợp Thực Tế của Biến Điều Kiện
Dưới đây là một ví dụ sử dụng biến điều kiện mô tả mô hình sản xuất - tiêu thụ đơn giản:
package main
import (
"fmt"
"sync"
"time"
)
// SafeQueue là một hàng đợi an toàn được bảo vệ bởi một mutex
type SafeQueue struct {
mu sync.Mutex
cond *sync.Cond
queue []interface{}
}
// Enqueue thêm một phần tử vào cuối hàng đợi và thông báo cho các goroutine đang chờ đợi
func (sq *SafeQueue) Enqueue(item interface{}) {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.queue = append(sq.queue, item)
sq.cond.Signal() // Thông báo cho các goroutine đang chờ đợi rằng hàng đợi không trống
}
// Dequeue loại bỏ một phần tử từ đầu hàng đợi, chờ nếu hàng đợi trống
func (sq *SafeQueue) Dequeue() interface{} {
sq.mu.Lock()
defer sq.mu.Unlock()
// Chờ khi hàng đợi trống
for len(sq.queue) == 0 {
sq.cond.Wait() // Chờ thay đổi của điều kiện
}
item := sq.queue[0]
sq.queue = sq.queue[1:]
return item
}
func main() {
queue := make([]interface{}, 0)
sq := SafeQueue{
mu: sync.Mutex{},
cond: sync.NewCond(&sync.Mutex{}),
queue: queue,
}
// Goroutine Sản xuất
go func() {
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second) // Giả lập thời gian sản xuất
sq.Enqueue(fmt.Sprintf("item%d", i)) // Sản xuất một phần tử
fmt.Println("Sản xuất:", i)
}
}()
// Goroutine Tiêu thụ
go func() {
for i := 0; i < 5; i++ {
item := sq.Dequeue() // Tiêu thụ một phần tử, chờ nếu hàng đợi trống
fmt.Printf("Tiêu thụ: %v\n", item)
}
}()
// Chờ một khoảng thời gian đủ lớn để đảm bảo tất cả sản xuất và tiêu thụ được hoàn thành
time.Sleep(10 * time.Second)
}
Trong ví dụ này, chúng ta đã định nghĩa một cấu trúc SafeQueue
với một hàng đợi nội bộ và một biến điều kiện. Khi người tiêu thụ gọi phương thức Dequeue
và hàng đợi trống, nó sẽ chờ sử dụng phương thức Wait
. Khi người sản xuất gọi phương thức Enqueue
để thêm một phần tử mới vào, nó sử dụng phương thức Signal
để đánh thức người tiêu thụ đang chờ đợi.
2.4 Nhóm Chờ
2.4.1 Khái niệm và Sử dụng của Nhóm Chờ
sync.WaitGroup
là một cơ chế đồng bộ hóa được sử dụng để chờ một nhóm các goroutines hoàn thành. Khi bắt đầu một goroutine, bạn có thể tăng bộ đếm bằng cách gọi phương thức Add
, và mỗi goroutine có thể gọi phương thức Done
(thực sự thực hiện Add(-1)
) khi nó hoàn thành. Goroutine chính có thể chặn bằng cách gọi phương thức Wait
cho đến khi bộ đếm đạt đến 0, cho biết rằng tất cả các goroutine đã hoàn thành nhiệm vụ của họ.
Khi sử dụng WaitGroup
, các điểm sau cần lưu ý:
- Các phương thức
Add
,Done
, vàWait
không an toàn đối với luồng và không nên được gọi đồng thời trong nhiều goroutines. - Phương thức
Add
nên được gọi trước khi goroutine mới được tạo.
2.4.2 Các Trường Hợp Sử Dụng Thực Tế của WaitGroup
Dưới đây là một ví dụ về việc sử dụng WaitGroup
:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Thông báo cho WaitGroup khi hoàn thành
fmt.Printf("Worker %d bắt đầu\n", id)
time.Sleep(time.Second) // Giả lập hoạt động tốn thời gian
fmt.Printf("Worker %d hoàn thành\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // Tăng bộ đếm trước khi bắt đầu goroutine
go worker(i, &wg)
}
wg.Wait() // Chờ cho tất cả các goroutine worker hoàn thành
fmt.Println("Tất cả các workers đã hoàn thành")
}
Trong ví dụ này, hàm worker
giả lập việc thực hiện một nhiệm vụ. Trong hàm main, chúng ta bắt đầu năm goroutine worker
. Trước khi bắt đầu mỗi goroutine, chúng ta gọi wg.Add(1)
để thông báo cho WaitGroup
rằng một nhiệm vụ mới đang được thực hiện. Khi mỗi hàm worker hoàn thành, nó gọi defer wg.Done()
để thông báo cho WaitGroup
rằng nhiệm vụ đã hoàn thành. Sau khi bắt đầu tất cả các goroutine, hàm main chặn tại wg.Wait()
cho đến khi tất cả các worker báo cáo hoàn thành.
2.5 Các Phép Toán Atomic (sync/atomic
)
2.5.1 Khái Niệm về Các Phép Toán Atomic
Các phép toán atomic đề cập đến các phép toán trong lập trình song song mà không thể chia nhỏ, có nghĩa là chúng không bị gián đoạn bởi các phép toán khác trong quá trình thực thi. Đối với nhiều goroutine, việc sử dụng các phép toán atomic có thể đảm bảo tính nhất quán dữ liệu và đồng bộ trạng thái mà không cần sử dụng khóa, vì các phép toán atomic tự đảm bảo tính atomic của việc thực thi.
Trong ngôn ngữ Go, gói sync/atomic
cung cấp các phép toán nhớt cấp thấp trên bộ nhớ atomic. Đối với các kiểu dữ liệu cơ bản như int32
, int64
, uint32
, uint64
, uintptr
, và pointer
, các phương thức từ gói sync/atomic
có thể được sử dụng cho các phép toán đồng thời an toàn. Tính quan trọng của các phép toán atomic nằm ở việc chúng là nền tảng để xây dựng các nguyên tố đồng thời khác (như khóa và biến điều kiện) và thường hiệu quả hơn cơ chế khóa.
2.5.2 Các Trường Hợp Sử Dụng Thực Tế của Các Phép Toán Atomic
Hãy xem xét một tình huống nơi chúng ta cần theo dõi số lượng người dùng đồng thời truy cập vào một trang web. Sử dụng một biến đếm đơn giản, chúng ta có thể tăng biến đếm khi một người dùng đến và giảm nó khi người dùng rời đi. Tuy nhiên, trong môi trường đồng thời, phương pháp này sẽ dẫn đến cạnh tranh dữ liệu. Do đó, chúng ta có thể sử dụng gói sync/atomic
để điều chỉnh biến đếm một cách an toàn.
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var visitorCount int32
func incrementVisitorCount() {
atomic.AddInt32(&visitorCount, 1)
}
func decrementVisitorCount() {
atomic.AddInt32(&visitorCount, -1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
incrementVisitorCount()
time.Sleep(time.Second) // Thời gian thăm của người dùng
decrementVisitorCount()
wg.Done()
}()
}
wg.Wait()
fmt.Printf("Số lượt truy cập hiện tại: %d\n", visitorCount)
}
Trong ví dụ này, chúng ta tạo 100 goroutine để giả lập sự đến và rời của người dùng. Bằng cách sử dụng hàm atomic.AddInt32()
, chúng ta đảm bảo rằng việc tăng giảm biến đếm là atomic, ngay cả trong các tình huống cực kỳ đồng thời, đảm bảo tính chính xác của visitorCount
.
2.6 Cơ Chế Đồng Bộ Hóa Kênh
2.6.1 Đặc Điểm Đồng Bộ Hóa của Kênh
Kênh là cách mà các goroutine có thể giao tiếp trong ngôn ngữ Go ở mức độ ngôn ngữ. Một kênh cung cấp khả năng gửi và nhận dữ liệu. Khi một goroutine cố gắng đọc dữ liệu từ kênh và kênh không có dữ liệu, nó sẽ chặn cho đến khi có dữ liệu có sẵn. Tương tự, nếu kênh đã đầy (đối với kênh không có bộ đệm, điều này có nghĩa là nó đã có dữ liệu), goroutine cố gắng gửi dữ liệu cũng sẽ chặn cho đến khi có không gian để ghi. Tính năng này khiến kênh rất hữu ích cho việc đồng bộ hóa giữa các goroutine.
2.6.2 Các trường hợp sử dụng đồng bộ hóa với Channels
Giả sử chúng ta có một nhiệm vụ cần hoàn thành bởi nhiều goroutines, mỗi goroutine xử lý một phần nhiệm vụ con, và sau đó chúng ta cần tổng hợp kết quả của tất cả các nhiệm vụ con. Chúng ta có thể sử dụng một channel để chờ đợi cho tất cả các goroutines hoàn thành.
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
// Thực hiện một số hoạt động...
fmt.Printf("Worker %d bắt đầu\n", id)
// Giả sử kết quả của nhiệm vụ con là id của worker
resultChan <- id
fmt.Printf("Worker %d hoàn thành\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 5
resultChan := make(chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
// Thu thập tất cả kết quả
for result := range resultChan {
fmt.Printf("Nhận được kết quả: %d\n", result)
}
}
Trong ví dụ này, chúng ta bắt đầu 5 goroutines để thực hiện các nhiệm vụ và thu thập kết quả thông qua channel resultChan
. Goroutine chính đợi cho tất cả công việc hoàn thành trong một goroutine riêng và sau đó đóng channel kết quả. Sau đó, goroutine chính duyệt qua channel resultChan
, thu thập và in kết quả của tất cả goroutines.
2.7 Thực thi một lần (sync.Once
)
sync.Once
là một nguyên tắc đồng bộ hóa đảm bảo rằng một hoạt động chỉ được thực hiện một lần trong quá trình thực thi chương trình. Một cách sử dụng điển hình của sync.Once
là trong việc khởi tạo một đối tượng duy nhất hoặc trong các tình huống yêu cầu khởi tạo trì hoãn. Bất kể có bao nhiêu goroutines gọi hoạt động này, nó sẽ chỉ chạy một lần, vì vậy tên của hàm Do
.
sync.Once
hoàn hảo cân bằng giữa vấn đề song song và hiệu suất thực thi, loại bỏ những lo ngại về vấn đề hiệu suất do khởi tạo lặp lại gây ra.
Ví dụ đơn giản dưới đây để minh họa cách sử dụng sync.Once
:
package main
import (
"fmt"
"sync"
)
var once sync.Once
var instance *Singleton
type Singleton struct{}
func Instance() *Singleton {
once.Do(func() {
fmt.Println("Tạo một phiên bản duy nhất ngay bây giờ.")
instance = &Singleton{}
})
return instance
}
func main() {
for i := 0; i < 10; i++ {
go Instance()
}
fmt.Scanln() // Chờ để xem kết quả đầu ra
}
Trong ví dụ này, ngay cả khi hàm Instance
được gọi đồng thời nhiều lần, việc tạo ra phiên bản Singleton
sẽ chỉ xảy ra một lần duy nhất. Các lần gọi sau sẽ trực tiếp trả về phiên bản duy nhất được tạo lần đầu, đảm bảo tính duy nhất của phiên bản.
2.8 ErrGroup
ErrGroup
là một thư viện trong ngôn ngữ Go được sử dụng để đồng bộ hóa nhiều goroutines và thu thập lỗi từ chúng. Đây là một phần của gói "golang.org/x/sync/errgroup", cung cấp một cách ngắn gọn để xử lý tình huống lỗi trong các hoạt động song song.
2.8.1 Khái niệm về ErrGroup
Ý tưởng cốt lõi của ErrGroup
là liên kết một nhóm các nhiệm vụ liên quan (thường được thực thi song song) với nhau, và nếu một trong những nhiệm vụ gặp lỗi, thực thi của toàn bộ nhóm sẽ bị hủy. Đồng thời, nếu bất kỳ hoạt động song song nào trả về lỗi, ErrGroup
sẽ nắm bắt và trả về lỗi này.
Để sử dụng ErrGroup
, trước tiên nhập gói:
import "golang.org/x/sync/errgroup"
Sau đó, tạo một phiên bản của ErrGroup
:
var g errgroup.Group
Sau đó, bạn có thể truyền các nhiệm vụ cho ErrGroup
dưới dạng closures và bắt đầu một Goroutine mới bằng cách gọi phương thức Go
:
g.Go(func() error {
// Thực hiện một nhiệm vụ cụ thể
// Nếu mọi thứ diễn ra suôn sẻ
return nil
// Nếu xảy ra lỗi
// return fmt.Errorf("đã xảy ra lỗi")
})
Cuối cùng, gọi phương thức Wait
, sẽ chặn và đợi cho tất cả các nhiệm vụ hoàn thành. Nếu bất kỳ nhiệm vụ nào trả về lỗi, Wait
sẽ trả về lỗi đó:
if err := g.Wait(); err != nil {
// Xử lý lỗi
log.Fatalf("Lỗi thực thi nhiệm vụ: %v", err)
}
2.8.2 Trường Hợp Thực Tế của ErrGroup
Hãy xem xét một kịch bản trong đó chúng ta cần đồng thời lấy dữ liệu từ ba nguồn dữ liệu khác nhau, và nếu một trong số các nguồn dữ liệu thất bại, chúng tôi muốn ngay lập tức hủy các hoạt động lấy dữ liệu từ các nguồn khác. Nhiệm vụ này có thể dễ dàng được thực hiện bằng cách sử dụng ErrGroup
:
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
)
func fetchDataFromSource1() error {
// Giả lập việc lấy dữ liệu từ nguồn 1
return nil // hoặc trả về một lỗi để giả lập việc thất bại
}
func fetchDataFromSource2() error {
// Giả lập việc lấy dữ liệu từ nguồn 2
return nil // hoặc trả về một lỗi để giả lập việc thất bại
}
func fetchDataFromSource3() error {
// Giả lập việc lấy dữ liệu từ nguồn 3
return nil // hoặc trả về một lỗi để giả lập việc thất bại
}
func main() {
var g errgroup.Group
g.Go(fetchDataFromSource1)
g.Go(fetchDataFromSource2)
g.Go(fetchDataFromSource3)
// Chờ cho tất cả các Goroutines hoàn thành và thu thập lỗi của chúng
if err := g.Wait(); err != nil {
fmt.Printf("Đã xảy ra lỗi khi lấy dữ liệu: %v\n", err)
return
}
fmt.Println("Tất cả dữ liệu đã được lấy thành công!")
}
Trong ví dụ này, các hàm fetchDataFromSource1
, fetchDataFromSource2
, và fetchDataFromSource3
giả lập việc lấy dữ liệu từ các nguồn dữ liệu khác nhau. Chúng được truyền vào phương thức g.Go
và được thực thi trong các Goroutines riêng biệt. Nếu bất kỳ hàm nào trả về một lỗi, g.Wait
sẽ ngay lập tức trả về lỗi đó, cho phép xử lý lỗi thích hợp khi lỗi xảy ra. Nếu tất cả các hàm thực thi thành công, g.Wait
sẽ trả về nil
, cho biết rằng tất cả các nhiệm vụ đã hoàn thành thành công.
Một tính năng quan trọng khác của ErrGroup
là nếu bất kỳ Goroutine nào gây ra panic, nó sẽ cố gắng khôi phục sự panic này và trả về nó dưới dạng một lỗi. Điều này giúp ngăn chặn các Goroutines khác đang chạy đồng thời từ việc thất bại trong việc tắt một cách lịch sự. Tất nhiên, nếu bạn muốn các nhiệm vụ phản ứng với tín hiệu hủy bên ngoài, bạn có thể kết hợp hàm WithContext
của errgroup
với gói context để cung cấp một ngữ cảnh có thể hủy.
Theo cách này, ErrGroup
trở thành một cơ chế đồng bộ và xử lý lỗi rất thiết thực trong thực hành lập trình đồng thời của Go.