您的位置:首頁 > 軟件教程 > 教程 > Go語言的100個錯誤使用場景(61-68)|并發(fā)實踐

Go語言的100個錯誤使用場景(61-68)|并發(fā)實踐

來源:好特整理 | 時間:2024-04-13 15:34:52 | 閱讀:173 |  標簽: 錯誤 GO   | 分享到:

我的愿景是以這套文章,在保持權威性的基礎上,脫離對原文的依賴,對這100個場景進行篇幅合適的中文講解。所涉內(nèi)容較多,總計約 8w 字,這是該系列的第八篇文章,對應書中第61-68個錯誤場景。

前言

大家好,這里是白澤。 《Go語言的100個錯誤以及如何避免》 是最近朋友推薦我閱讀的書籍,我初步瀏覽之后,大為驚喜。就像這書中第一章的標題說到的:“ Go: Simple to learn but hard to master ”,整本書通過分析100個錯誤使用 Go 語言的場景,帶你深入理解 Go 語言。

我的愿景是以這套文章,在保持權威性的基礎上,脫離對原文的依賴,對這100個場景進行篇幅合適的中文講解。所涉內(nèi)容較多,總計約 8w 字,這是該系列的第八篇文章,對應書中第61-68個錯誤場景。

? 當然,如果您是一位 Go 學習的新手,您可以在我開源的 學習倉庫 中,找到針對 《Go 程序設計語言》 英文書籍的配套筆記,其他所有文章也會整理收集在其中。

? B站: 白澤talk ,公眾號【白澤talk】,聊天交流群:622383022,原書電子版可以加群獲取。

前文鏈接:

  • 《Go語言的100個錯誤使用場景(1-10)|代碼和項目組織》

  • 《Go語言的100個錯誤使用場景(11-20)|項目組織和數(shù)據(jù)類型》

  • 《Go語言的100個錯誤使用場景(21-29)|數(shù)據(jù)類型》

  • 《Go語言的100個錯誤使用場景(30-40)|數(shù)據(jù)類型與字符串使用》

  • 《Go語言的100個錯誤使用場景(40-47)|字符串&函數(shù)&方法》

  • 《Go語言的100個錯誤使用場景(48-54)|錯誤管理》

  • 《Go語言的100個錯誤使用場景(55-60)|并發(fā)基礎》

9. 并發(fā)實踐

? 章節(jié)概述

  • 防止發(fā)生 goroutine 和 channel 中的常見錯誤
  • 理解標準數(shù)據(jù)結構在并發(fā)場景的使用
  • 使用標準庫和一些擴展
  • 避免數(shù)據(jù)競爭和死鎖

9.1 context 的不恰當傳播(#61)

context 作為承載上下文的實例,經(jīng)常在各個函數(shù)之間傳播,由于 context.Context 本身是一個接口,它聲明了四個方法:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key any) any
}

當一個 context 因為過期或者被手動 cancel,都會導致上下文關閉。此時可以從 Done() 獲得的 channel 中獲得關閉信號,以及從 Err() 方法獲得原因。

這也導致了,在傳遞 context 實例的時候,因為一些原因導致傳遞給子步驟的 context 已經(jīng)關閉,但是子步驟中需要使用到,從而造成混淆。

? 假設有一個場景,針對收到的一個 HTTP 請求,服務端會處理一些任務,得到結果A,同時將處理結果A通過 Kafka 異步發(fā)送一個事件,同時主協(xié)程返回任務處理結果A給客戶端。

func handler(w http.ResponseWriter, r *http.Request) {
    response, err := doSomeTask(r.Context(), r)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    go func() {
        err := publish(r.Context(), response)
        // Do something with err
    }()
    
    writeResponse(response)
}

考慮以下三個場景:

  1. 客戶端請求關閉
  2. 如果是 HTTP/2 的請求,當請求被取消
  3. 當 response 已經(jīng)被返回給客戶端

前兩個場景,如果在執(zhí)行完 doSomeTask() 的到 response 并調(diào)用 publish 后,請求被取消,則 publish 函數(shù)是可以允許接收一個被關閉的 context 實例的,只要在函數(shù)內(nèi)判斷當 context 被取消時,不發(fā)送消息即可。 (當然不做任何處理,允許發(fā)送也是沒有問題的)

但如果是已經(jīng)將 writeResponse(response) 觸發(fā),響應給客戶端,則 *http.Request 關聯(lián)的 context 會被取消,此時如果在 publish() 函數(shù)中,做了 context 實例是否被取消的判斷,則會出現(xiàn)混淆。因為此時是執(zhí)行成功的鏈路,只是 go func() 執(zhí)行邏輯因為異步的原因慢了,kafka 消息還是需要發(fā)送的。

? 解決方案:

type detach struct {
    ctx context.Context
}

func (d detach) Deadline() (time.Time, bool) {
    return time.Time{}, false
}

func (d detach) Done() <-chan struct{} {
    return nil
}

func (d detach) Err() error {
    return nil
}

func (d detach) Value(key any) any {
    return d.ctx.Value(key)
}
------------------------
// 使用方式
err := publish(detach{ctx: r.Context()}, response)

自定義 context 實例,將 Done() 和 Err() 方法失效,當不希望 context 的關閉對子步驟造成影響,可以通過這種方式,保留從原 context.Context 的實例中,獲取上下文參數(shù) value 的能力。

9.2 開啟一個協(xié)程但不知道何時關閉(#62)

goroutine 泄漏:

協(xié)程啟動將占用一個約 2KB 大小的棧內(nèi)存空間,并隨著使用增長或者收縮占用的空間,一個協(xié)程可以持有一個引用類型的變量,且分配在堆上。goroutine 也可以持有 HTTP 鏈接、數(shù)據(jù)庫連接池等各種資源,如果協(xié)程發(fā)生了泄漏,則這些協(xié)程內(nèi)原本應該被優(yōu)雅釋放的資源也將發(fā)生泄漏。

? 錯誤示例一:

ch := foo()
go func() {
    for v := range ch {
        //..
    }  
}()

在上述示例中,新創(chuàng)建的協(xié)程只有當主協(xié)程創(chuàng)建的 channel 被關閉的時候才會結束,但是如果外部沒有主動關閉,則這個子協(xié)程會發(fā)生泄漏,永遠無法關閉。

? 錯誤示例二:

假設應用執(zhí)行之前需要通過一個函數(shù)去監(jiān)聽外部的配置信息。

func main() {
    newWatcher()
    
    // Run the application
}

type watcher struct{ /* Some resource */}

func newWatcher() {
    w := watcher{}
    go w.watch()
}

上述代碼的問題在于,newWatcher 函數(shù)內(nèi)啟動的子協(xié)程會由于主協(xié)程的結束而被迫終止,導致 watcher 結構體所持有的資源,沒有被優(yōu)雅關閉。

? 錯誤示例三:

在錯誤示例二的基礎上,容易犯的一個錯誤是,認為可以通過傳遞一個 context 來感知主協(xié)程關閉,從而控制子協(xié)程資源的釋放。

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    newWatcher(ctx)
    // Run the application
}

type watcher struct{ /* Some resource */}

func newWatcher(ctx context.Context) {
    w := watcher{}
    go w.watch(ctx)
}

錯誤原因:此時主協(xié)程如果關閉了傳遞給 watcher 結構體的 context,但是依舊有可能主函數(shù)直接執(zhí)行完成,關閉了,子協(xié)程即使收到了 context 關閉的信號,依舊不一定有時間完成資源的釋放。

? 正確示例:

func main() {
    w := newWatcher()
    defer w.close()
    
    // Run the application
}

func newWatcher() watcher {
    w := watcher{}
    go w.watch()
    return w
}

type watcher struct{ /* Some resource */}

func (w watcher) close() {
    // Close the resources
}

前幾個示例出現(xiàn)資源釋放問題的原因在于,在父協(xié)程關閉的時候,并沒有阻塞等待子協(xié)程資源的釋放,因此正確示例中,主協(xié)程在 return 之前,主動關閉 watcher 結構體持有的資源,實現(xiàn)優(yōu)雅退出。

? 最佳實踐:

將 goroutine 當作一種資源,在創(chuàng)建的開始就需要考慮何時關閉,并且如果 goroutine 持有了其他的資源,則需要一并考慮這些資源的釋放。

如果要關閉主協(xié)程,務必將所有的釋放工作,提前完成。

9.3 在循環(huán)中沒有謹慎使用協(xié)程(#63)

錯誤示例:

s := []int{1, 2, 3}

for _, i := range s {
    go func() {
        fmt.Println(i)  
    }()
}

// 輸出結果可能是:233,333

循環(huán)結構內(nèi)部的 goroutine,這種閉包的寫法,持有的 i 是同一個變量,因此雖然 i 是按照順序1,2,3賦值的,但是并不能決定協(xié)程是在 i 等于幾的時候觸發(fā)打印操作。

比如出現(xiàn)233的執(zhí)行順序圖示如下:

Go語言的100個錯誤使用場景(61-68)|并發(fā)實踐

解決方案一:

for _, i := range s {
    val := i
    go fun() {
        fmt.Println(val)
    }()
}

通過引入 val 變量,可以確保 val 也是按順序1,2,3進行賦值的,因為是局部變量,因此可以確保最終打印結果的有序。

解決方案二:

for _, i := range s {
    go func(val int) {
        fmt.Print(val)
    }(i)
}

此時 goroutine 內(nèi)部并沒有直接引用外部的變量,此時 val 是輸入的一部分,因此是一份新的拷貝,并不會引用同一個變量 i,所以依舊可以輸出123。

9.4 使用 select 和 channel 期待某個確定的行為(#64)

假設需要同時監(jiān)聽兩個 channel,一個 channel 獲取消息,一個 channel 獲取關閉信號:

for {
    select {
        // 此時 messageCh 是一個具有緩沖的 channel
        case v := <-messageCh:
        fmt.Println(v)
        case <-disconnectCh:
        fmt.Println("disconnection, return")
        return
    }
}
---------------------------------------------
for i := 0; i < 10; i++ {
    messageCh <- i
}
disconnectCh <- struct{}{}
// 執(zhí)行之后,輸出結果可能為
0
1
2
3
4
5
disconnection, return

Go語言中:雖然 select 的兩個 case,第一個獲取 message 的 channel 排在前面,但是當多個條件同時成立的時候,執(zhí)行是隨機的,為了避免饑餓的情況。

為了能夠順利打印出所有的十個數(shù),有兩種方案:

  1. 將有緩沖的 channel 替換成無緩沖的 channel,這樣使得消息的發(fā)送和接收成為了一個阻塞的串行流程,在完成所有數(shù)字的打印操作之前,主協(xié)程并不會執(zhí)行 disconnectCh <- struct{}{} 這句代碼。
  2. 使用單一的一個 channel 獲取消息以及結束信號,用一個結構體作為 channel 的消息內(nèi)容。

假設一定有多個消息的接收端,則通常來說,無法預測消息執(zhí)行的順序,一個可選的解決方案:

for {
    select {
        case v := <-messageCh:
        fmt.Println(v)
        case <-disconnectCh:
        for {
            select {
                case v := <-messageCh:
                fmt.Println(v)
                default:
                fmt.Println("disconnection, return")
                return
            }
        }
    }
}

當觸發(fā)關閉鏈接的時候,在一個新的循環(huán)中消費 messageCh 中剩余所有的 message ,select 語句的 defalut case 當且僅當沒有其他 case 匹配的時候會執(zhí)行。

當然如果某一個時刻,還有協(xié)程即將向 messageCh 發(fā)送消息,但是 messageCh 此刻為空,則會執(zhí)行 select/default case,導致未發(fā)送的 message 的丟失。

9.5 不使用用于通知的 channel(#65)

假設需要一個 channel,為另一個協(xié)程傳遞關閉鏈接的信號,此時可以通過如下實現(xiàn):

disconnectCh := make(chan bool)

這種方式可以通過傳遞一個 true 字面量用于通知子協(xié)程關閉鏈接,但是 false 字面量是沒有意義的,此時需要的只是一個信號,所以可以使用空的結構體實現(xiàn):

disconnectCh := make(chan struct{})

空的結構體本身不占用額外的存儲空間,但是可以達到傳遞信號的效果,是 Go 語言當中地道的用法。

使用 struct{} 作為占位,經(jīng)常出現(xiàn)在其他場景中,比如創(chuàng)建一個集合:

set := make(map[K]struct{})

9.6 不使用 nil channel(#66)

nil channel 的特性:

var ch chan int
<-ch // 會阻塞
ch<-1 // 會阻塞

假設有這樣一個場景,需要從兩個 channel 中接收數(shù)據(jù),并且合并兩個 channel 的數(shù)據(jù)到另一個 channel,且另一個 channel 的 buffer 長度為1。

錯誤示例一:

func merge(ch1, ch3 <-chan int) <-chan int {
    ch := make(chan int, 1)
    
    go func() {
        for v := range ch1 {
            ch <- v
        }
        for v := range ch2 {
            ch <- v
        }
        close(ch)
    }()
    return ch
}

這種情況下,必須等 ch1 所有數(shù)據(jù)全部讀取完畢,才會讀取 ch2 的,并不是一個并發(fā)模型。

錯誤示例二:

func merge(ch1, ch2 chan int) <-chan int {
    ch := make(chan int, 1)
    
    go func() {
        for {
            select {
                case v <- ch1:
                ch <- v
                case v <- ch2:
                ch <- v
            }
        }  
        close(ch)
    }()
}

使用 for/select 可以實現(xiàn)隨機從兩個 channel 中獲取 v,但是問題在于,上述這種 for 循環(huán)將永遠無法結束,即使外部可以控制將 ch1 和 ch2 都關閉了,但是面對兩個關閉的 channel,select 的兩個 case 的讀取操作是不會阻塞的,依舊會讀取出 0 值,并傳遞給 ch,導致 close(ch) 永遠無法觸發(fā)。

錯誤示例三:

func merge(ch1, ch2 chan int) <-chan int {
    ch := make(chan int, 1)
    ch1Closed := false
    ch2Closed := false
    
    go func() {
        for {
            select {
            case v, open := <-ch1:
                if !open {
                    ch1Closed = true
                    break
                }    
                ch <- v
            case v, open := <-ch2:
                if !open {
                    ch2Closed = true
                    break
                }
                ch <- v
            }
            if ch1Closed && ch2Closed {
                close(ch)
               	return
            }
        }    
    }()
    
    return ch
}

通過狀態(tài)機的形式,控制當兩個 ch 都關閉的時候,觸發(fā)第三個 channel 的關閉。但是上述實現(xiàn)有一個問題,就是即使 ch1 或者 ch2 有一者關閉了,因為 select 的兩個 case 依舊不是阻塞的,所以會出現(xiàn)浪費 CPU 進行空轉的情況,比如 ch1 已經(jīng)關閉了,但是 select 依舊是隨機觸發(fā)了 case1,導致在觸發(fā)另一個 case2 之前,會出現(xiàn)重復進入 select 循環(huán)的情況。(因為必須兩個狀態(tài)都是 true 才會使得狀態(tài)機觸發(fā) close(ch) 的邏輯)。

推薦方案:

func merge(ch1, ch2 chan int) <-chan int {
    ch := make(chan int, 1)
    
    go func() {
        for ch1 != nil || ch2 != nil {
            select {
            case v, open := <-ch1:
                if !open {
                    ch1 = nil
                    break
                }
                ch <- v
            case v, open := <-ch2:
                if !open {
                    ch2 = nil
                    break
                }
                ch <- v
            }
        }
        close(ch)
    }()
    
    return ch
}

利用 nil channel 的阻塞特性(存入和取出元素都會阻塞),使得當任一 channel 關閉之后,直接設置為 nil,這樣會導致這個關聯(lián)的 select 的 case 將永遠阻塞,不會觸發(fā),會強制依賴另一個 case 的讀取情況,如果另一個 channel 也關閉了,設置為 nil,則 for 循環(huán)條件不滿足,結束循環(huán),可以觸發(fā) close(ch)。

9.7 對 channel 的大小感到疑惑(#67)

如果從簡單控制協(xié)程之間的同步,可以選擇無緩沖的 channel,因為使用帶有緩沖的 channel 并不能完全控制多個協(xié)程的執(zhí)行順序。

哪些情況下使用帶有緩沖的 channel 更好:

  1. worker 工作池模式,如果有多個協(xié)程充當 worker,消費任務,那么可以創(chuàng)建一個容量等價于 worker 個數(shù)的 channel 用于傳遞結果,或者發(fā)送任務。
  2. 限制資源的訪問,可以通過帶有緩沖的 channel 限制可以訪問某個資源的協(xié)程的數(shù)量(請求數(shù)量),達到一種限流的效果。

但是本質(zhì)來說,設置帶有緩沖的 channel 的大小與當前業(yè)務息息相關,使用更大的 channel 意味著允許更多的協(xié)程進行合作,但是也會消耗更多的內(nèi)存,同時協(xié)程的執(zhí)行也會消耗 CPU 的資源,因此,需要權衡 Memory 和 CPU 的使用后決定 buffer 的 size。

9.8 忽視 string 格式化的副作用(#68)

在協(xié)程并發(fā)的場景中,string 格式化存在副作用,下面講解兩個場景。

  • etcd 數(shù)據(jù)競爭

etcd 是一個基于 Go 語言實現(xiàn)的分布式的 key-value 存儲,提供了接口用于集群間的數(shù)據(jù)變更監(jiān)聽和交互,例如:

type Watcher interface {
    // Watch 監(jiān)聽通過一個 key 獲得的 channel,然后從 channel 中獲取需要監(jiān)聽的事件
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
    Close() error
}

服務端需要提供一個結構體,實現(xiàn) Watcher 接口,并為客戶端提供服務:

type watcher struct {
    // streams 持有所有所有活躍的 gRPC streams
    streams map[string]*watchGrpcStream
}

func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    ctxKey := fmt.Sprintf("%v", ctx)
    // ...
    wgs := w.stream[ctxKey]
    // ...
}

上述 API 基于 gRPC 的 streaming 操作,本質(zhì)是用于客戶端和服務端的通信。

其中 ctxKey 是 map 的 key,通過 context 的格式化得到,當使用通過 context.WithValue 創(chuàng)建的 context 進行格式化的時候,Go 會讀取這個 context 中所有的 value 值,在這種情況下,開發(fā)者會發(fā)現(xiàn) context 包含了可變的值,例如一個指向結構體的指針,因此在多個協(xié)程間傳遞的 context 的值可能會被某個協(xié)程修改,從而導致數(shù)據(jù)競爭問題,最終影響格式化的準確性。

這種情況下,推薦的解決方式是選擇不使用 fmt.Sprintf 去格式化 map 的 key,以免發(fā)生 context 格式化 value 的問題,或者額外實現(xiàn)一個 context 類型,格式化可以確定的上下文的 value。

  • 死鎖

假設有一個 customer 結構體,提供了修改 age 的方法和格式化輸出方法,且由于會被并發(fā)讀寫,因此使用讀寫鎖保護:

type Customer struct {
    mutex sync.RWMutex
    id string
    age int
}

func (c *Customer) UpdateAge(age int) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    if age < 0 {
        return fmt.Errorf("age should be positive for customer %v", c)
    }
    
    c.age = age
    return nil
}

func (c *Customer) String() string {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    return fmt.Sprintf("id %s, age %d", c.id, c.age)
}

死鎖的場景:假設為顧客修改 age,設置了一個小于0的age,則會觸發(fā) fmt.Errorf 格式化輸出錯誤,由于格式化 %v 的時候,會調(diào)用 Customer 的 String() 方法,由于寫鎖已經(jīng)被占用,String() 無法獲取讀鎖,導致死鎖。

解決方案:

  1. 單元測試很重要,充分的單元測試可以檢測出問題
  2. 改變鎖的使用時機:先判斷 age 非法,在修改 age 之前,再上鎖。
func (c *Customer) UpdateAge(age int) error {
    if age < 0 {
        return fmt.Errorf("age should be positive for customer %v", c)
    }
    c.mutex.Lock()
    defer c.mutex.Unlock()
    c.age = age
    return nil
}

當然,第一種寫法,也并不一定會導致,打印錯誤信息的時候觸發(fā)死鎖,只要確保不在持有寫鎖的時候,去試圖獲取讀鎖即可:

func (c *Customer) UpdateAge(age int) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    if age < 0 {
        return fmt.Errorf("age should be positive for customer %d", c.id)
    }
    
    c.age = age
    return nil
}

上述情況下,在打印錯誤的時候,只需要使用 c.id,并不會觸發(fā) Customer 的 String() 方法,從而避免了死鎖。

小節(jié)

你已完成全書學習68%,再接再厲。

小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認同期限觀點或證實其描述。

Go v1.62
Go v1.62
類型:動作冒險  運營狀態(tài):正式運營  語言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動

GoEscape是一款迷宮逃脫休閑闖關游戲。在這款游戲中,玩家可以挑戰(zhàn)大量關卡,通過旋轉屏幕的方式幫助球球

相關視頻攻略

更多

掃二維碼進入好特網(wǎng)手機版本!

掃二維碼進入好特網(wǎng)微信公眾號!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權,請發(fā)郵件[email protected]

湘ICP備2022002427號-10 湘公網(wǎng)安備:43070202000427號© 2013~2024 haote.com 好特網(wǎng)