Go并發(fā)編程之goroutine使用正確方法
并發(fā)(concurrency): 指在同一時刻只能有一條指令執(zhí)行,但多個進程指令被快速的輪換執(zhí)行,使得在宏觀上具有多個進程同時執(zhí)行的效果,但在微觀上并不是同時執(zhí)行的,只是把時間分成若干段,通過cpu時間片輪轉(zhuǎn)使多個進程快速交替的執(zhí)行。
1. 對創(chuàng)建的gorouting負載
1.1 不要創(chuàng)建一個你不知道何時退出的 goroutine
下面的代碼有什么問題? 是不是在我們的程序種經(jīng)常寫類似的代碼?
// Week03/blog/01/01.go
package main
import (
"log"
"net/http"
_ "net/http/pprof"
)
// 初始化函數(shù)
func setup() {
// 這里面有一些初始化的操作
}
// 入口函數(shù)
func main() {
setup()
// 主服務(wù)
server()
// for debug
pprof()
select {}
}
// http api server
func server() {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})
// 主服務(wù)
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Panicf("http server err: %+v", err)
return
}
}()
}
// 輔助服務(wù),用來debug性能測試
func pprof() {
// 輔助服務(wù),監(jiān)聽了其他端口,這里是 pprof 服務(wù),用于 debug
go http.ListenAndServe(":8081", nil)
}
以上代碼有幾個問題,是否想到過?
- 如果
server是在其他的包里面, 如果沒有特殊的說明, 調(diào)用者是否知道這是一個異步調(diào)用? main函數(shù)種,最后使用select {}使整個程序處于阻塞狀態(tài),也就是空轉(zhuǎn), 會不會存在浪費?- 如果線上出現(xiàn)事故,debug服務(wù)已經(jīng)突出,你想要debug這時是否很茫然?
- 如果某一天服務(wù)突然重啟, 你卻找不到事故日志, 是否能想到起的這個
8801端口的服務(wù)呢?
1.2 不要幫別人做選擇
把是否 并發(fā) 的選擇權(quán)交給你的調(diào)用者,而不是自己就直接悄悄的用上了 goroutine
下面做如下改變,將兩個函數(shù)是否并發(fā)操作的選擇權(quán)留給main函數(shù)
package main
import (
"log"
"net/http"
_ "net/http/pprof"
)
func setup(){
// 初始化操作
}
func main(){
setup()
// for debug
go pprof()
// 主服務(wù),http api
go server()
select{}
}
func server(){
mux := http.NewServerMux()
mux.HandleFunc("ping", func(w http.ResponseWriter, r * http.Request){
w.Write([]byte("pong"))
}
// 主服務(wù)
if err := http.ListerAndServer(":8080",mux); err != nil{
log.panic("http server launch error: %v", err)
return
}
}
func pprof(){
// 輔助服務(wù) 監(jiān)聽其他端口,這里是pprof服務(wù),擁有debug
http.ListerAndServer(":8081",nil)
}
1.3 不要作為一個旁觀者
一般情況下,不要讓 主進程稱為一個無所事事的旁觀者, 明明可以干活,但是最后使用一個select在那兒空跑,而且這種看著也怪,在沒有特殊場景下盡量不要使用這種阻塞的方式
package main
import (
"log"
"net/http"
_ "net/http/pprof"
)
func setup() {
// 這里面有一些初始化的操作
}
func main() {
setup()
// for debug
go pprof()
// 主服務(wù), http本來就是一個阻塞的服務(wù)
server()
}
func server() {
mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})
// 主服務(wù)
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Panicf("http server err: %+v", err)
return
}
}
func pprof() {
// 輔助服務(wù),監(jiān)聽了其他端口,這里是 pprof 服務(wù),用于 debug
http.ListenAndServe(":8081", nil)
}
1.4 不要創(chuàng)建不知道什么時候退出的 goroutine
很多時候我們在創(chuàng)建一個 協(xié)程(goroutine)后就放任不管了,如果程序永遠運行下去,可能不會有什么問題,但實際情況并非如此, 我們的產(chǎn)品需要迭代,需要修復(fù)bug,需要不停進行構(gòu)建,發(fā)布, 所以當(dāng)程序退出后(主程序),運行的某些子程序并不會完全退出,比如這個 pprof, 他自身本來就是一個后臺服務(wù),但是當(dāng) main退出后,實際 pprof這個服務(wù)并不會退出,這樣 pprof就會稱為一個孤魂野鬼,稱為一個 zombie, 導(dǎo)致goroutine泄漏。
所以再一次對程序進行修改, 保證 goroutine能正常退出
package main
import (
"context"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"time"
)
func setup() {
// 這里面有一些初始化的操作
}
func main() {
setup()
// 用于監(jiān)聽服務(wù)退出, 這里使用了兩個 goroutine,所以 cap 為2
done := make(chan error, 2)
// 無緩沖的通道,用于控制服務(wù)退出,傳入同一個 stop,做到只要有一個服務(wù)退出了那么另外一個服務(wù)也會隨之退出
stop := make(chan struct{}, 0)
// for debug
go func() {
// pprof 傳遞一個 channel
fmt.Println("pprof start...")
done <- pprof(stop)
fmt.Printf("err1:%v\n", done)
}()
// 主服務(wù)
go func() {
fmt.Println("app start...")
done <- app(stop)
fmt.Printf("err2:%v\n", done)
}()
// stopped 用于判斷當(dāng)前 stop 的狀態(tài)
var stopped bool
// 這里循環(huán)讀取 done 這個 channel
// 只要有一個退出了,我們就關(guān)閉 stop channel
for i := 0; i < cap(done); i++ {
// 對于有緩沖的chan, chan中無值會一直處于阻塞狀態(tài)
// 對于app 服務(wù)會一直阻塞狀態(tài),不會有 數(shù)據(jù)寫入到done 通道,只有在5s后,模擬的 pprof會有err寫入chan,此時才會觸發(fā)以下邏輯
if err := <-done; err != nil {
log.Printf("server exit err: %+v", err)
}
if !stopped {
stopped = true
// 通過關(guān)閉 無緩沖的channel 來通知所有的 讀 stop相關(guān)的goroutine退出
close(stop)
}
}
}
// http 服務(wù)
func app(stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})
return server(mux, ":8080", stop)
}
func pprof(stop <-chan struct{}) error {
// 注意這里主要是為了模擬服務(wù)意外退出,用于驗證一個服務(wù)退出,其他服務(wù)同時退出的場景
// 因為這里沒有返回err, 所以done chan中無法接收到值, 主程序中會一直阻塞住
go func() {
server(http.DefaultServeMux, ":8081", stop)
}()
time.Sleep(5 * time.Second)
// 模擬出錯
return fmt.Errorf("mock pprof exit")
}
// 啟動一個服務(wù)
func server(handler http.Handler, addr string, stop <-chan struct{}) error {
s := http.Server{
Handler: handler,
Addr: addr,
}
// 這個 goroutine 控制退出,因為 stop channel 只要close或者是寫入數(shù)據(jù),這里就會退出
go func() {
// 無緩沖channel等待,寫入或者關(guān)閉
<-stop
log.Printf("server will exiting, addr: %s", addr)
// 此時 httpApi 服務(wù)就會優(yōu)雅的退出
s.Shutdown(context.Background())
}()
// 沒有觸發(fā)異常的話,會一直處于阻塞
return s.ListenAndServe()
}
查看以下運行結(jié)果
D:\gopath\controlGoExit>go run demo.go
app start...
pprof start...
err1:0xc00004c720
2021/09/12 22:48:37 server exit err: mock pprof exit
2021/09/12 22:48:37 server will exiting, addr: :8080
2021/09/12 22:48:37 server will exiting, addr: :8081
err2:0xc00004c720
2021/09/12 22:48:37 server exit err: http: Server closed
雖然我們已經(jīng)經(jīng)過了三輪優(yōu)化,但是這里還是有一些需要注意的地方:
- 雖然我們調(diào)用了 Shutdown 方法,但是我們其實并沒有實現(xiàn)優(yōu)雅退出
- 在 server 方法中我們并沒有處理 panic的邏輯,這里需要處理么?如果需要那該如何處理呢?
1.5 不要創(chuàng)建都無法退出的 goroutine
永遠無法退出的 goroutine, 即 goroutine 泄漏
下面是一個例子,可能在不知不覺中會用到
package main
import (
"log"
_ "net/http/pprof"
"net/http"
)
func setup() {
// 這里面有一些初始化的操作
log.Print("服務(wù)啟動初始化...")
}
func main() {
setup()
// for debug
go pprof()
// 主服務(wù), http本來就是一個阻塞的服務(wù)
server()
}
func server() {
mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})
mux.HandleFunc("/leak", LeakHandle)
// 主服務(wù)
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Panicf("http server err: %+v", err)
return
}
}
func pprof() {
// 輔助服務(wù),監(jiān)聽了其他端口,這里是 pprof 服務(wù),用于 debug
http.ListenAndServe(":8081", nil)
}
func LeakHandle(w http.ResponseWriter, r *http.Request) {
ch := make(chan bool, 0)
go func() {
fmt.Println("異步任務(wù)做一些操作")
<-ch
}()
w.Write([]byte("will leak"))
}
復(fù)用一下上面的 server 代碼,我們經(jīng)常會寫出這種類似的代碼
- http 請求來了,我們啟動一個 goroutine 去做一些耗時一點的工作
- 然后返回了
- 然后之前創(chuàng)建的那個 goroutine 阻塞了(對于一個無緩沖的chan,如果沒有接收或關(guān)閉操作會永遠阻塞下去)
- 然后就泄漏了
絕大部分的 goroutine 泄漏都是因為 goroutine 當(dāng)中因為各種原因阻塞了,我們在外面也沒有控制它退出的方式,所以就泄漏了
接下來我們驗證一下是不是真的泄漏了
服務(wù)啟動之后,訪問debug訪問網(wǎng)址,http://localhost:8081/debug/pprof/goroutine?debug=1.
當(dāng)請求兩次 http://127.0.0.1/leak后查看 goroutine數(shù)量,如圖

繼續(xù)請求三次后,如圖

1.6 確保創(chuàng)建出的goroutine工作已經(jīng)完成
這個其實就是優(yōu)雅退出的問題,程序中可能啟動了很多的 goroutine 去處理一些問題,但是服務(wù)退出的時候我們并沒有考慮到就直接退出了。例如退出前日志沒有 flush 到磁盤,我們的請求還沒完全關(guān)閉,異步 worker 中還有 job 在執(zhí)行等等。
看一個例子,假設(shè)現(xiàn)在有一個埋點服務(wù),每次請求我們都會上報一些信息到埋點服務(wù)上
// Reporter 埋點服務(wù)上報
type Reporter struct {
}
var reporter Reporter
// 模擬耗時
func (r Reporter) report(data string) {
time.Sleep(time.Second)
fmt.Printf("report: %s\n", data)
}
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
// 在請求中異步調(diào)用
// 這里并沒有滿足一致性
go reporter.report("ping pong")
fmt.Println("ping")
w.Write([]byte("pong"))
})
在發(fā)送一次請后之后就直接退出了, 異步上報的邏輯是沒有執(zhí)行的
$ go tun demo.go ping ^C signal:interrupt
有兩種改法:
- 一種是給 reporter 加上 shutdown 方法,類似 http 的 shutdown,等待所有的異步上報完成之后,再退出
- 另外一種是我們直接使用 一些 worker 來執(zhí)行,在當(dāng)然這個 worker 也要實現(xiàn)類似 shutdown 的方法。
一般推薦后一種,因為這樣可以避免請求量比較大時,創(chuàng)建大量 goroutine,當(dāng)然如果請求量比較小,不會很大,用第一種也是可以的。
第二種方法代碼如下:
// 埋點上報
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
)
// Reporter 埋點服務(wù)上報
type Reporter struct {
worker int
messages chan string
wg sync.WaitGroup
closed chan struct{}
once sync.Once
}
// NewReporter NewReporter
func NewReporter(worker, buffer int) *Reporter {
return &Reporter{
worker: worker,
messages: make(chan string, buffer),
closed: make(chan struct{}),
}
}
// 執(zhí)行上報
func (r *Reporter) Run(stop <-chan struct{}) {
// 用于執(zhí)行錯誤
go func() {
// 沒有錯誤時
<-stop
fmt.Println("stop...")
r.shutdown()
}()
for i := 0; i < r.worker; i++ {
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
select {
case <-r.closed:
return
case msg := <-r.messages:
fmt.Printf("report: %s\n", msg)
}
}
}()
}
r.wg.Wait()
fmt.Println("report workers exit...")
}
// 這里不必關(guān)閉 messages
// 因為 closed 關(guān)閉之后,發(fā)送端會直接丟棄數(shù)據(jù)不再發(fā)送
// Run 方法中的消費者也會退出
// Run 方法會隨之退出
func (r *Reporter) shutdown() {
r.once.Do(func() { close(r.closed) })
}
// 模擬耗時
func (r *Reporter) Report(data string) {
// 這個是為了及早退出
// 并且為了避免我們消費者能力很強,發(fā)送者這邊一直不阻塞,可能還會一直寫數(shù)據(jù)
select {
case <-r.closed:
fmt.Printf("reporter is closed, data will be discarded: %s \n", data)
default:
}
select {
case <-r.closed:
fmt.Printf("reporter is closed, data will be discarded: %s \n", data)
case r.messages <- data:
}
}
func setup3() {
// 初始化一些操作
fmt.Println("程序啟動...")
}
func main() {
setup3()
// 用于監(jiān)聽服務(wù)完成時退出
done := make(chan error, 3)
// 實例化一個 reporter
reporter := NewReporter(2, 100)
// 用于控制服務(wù)退出,傳入同一個 stop,做到只要有一個服務(wù)退出了那么另外一個服務(wù)也會隨之退出
stop := make(chan struct{}, 0)
// for debug
go func() {
done <- pprof3(stop)
}()
// http主服務(wù)
go func() {
done <- app3(reporter, stop)
}()
// 上報服務(wù),接收一個監(jiān)控停止的 chan
go func() {
reporter.Run(stop)
done <- nil
}()
// 這里循環(huán)讀取 done 這個 channel
// 只要有一個退出了,我們就關(guān)閉 stop channel
for i := 0; i < cap(done); i++ {
// 對于有緩沖的chan, chan中無值會一直處于阻塞狀態(tài)
// 對于app 服務(wù)會一直阻塞狀態(tài),不會有 數(shù)據(jù)寫入到done 通道,只有在5s后,模擬的 pprof會有err寫入chan,此時才會觸發(fā)以下邏輯
if err := <-done; err != nil {
log.Printf("server exit err: %+v", err)
}
// 通過關(guān)閉 無緩沖的channel 來通知所有的 讀 stop相關(guān)的goroutine退出
close(stop)
}
}
func pprof3(stop <-chan struct{}) error {
// 輔助服務(wù),監(jiān)聽了其他端口,這里是 pprof 服務(wù),用于 debug
err := server3(http.DefaultServeMux, ":8081", stop)
return err
}
func app3(report *Reporter, stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
// 在請求中異步調(diào)用
// 這里并沒有滿足一致性
go report.Report("ping pong")
fmt.Println("ping")
_, err := w.Write([]byte("pong"))
if err != nil {
log.Println("response err")
}
})
return server3(mux, ":8080", stop)
}
// 啟動一個服務(wù)
func server3(handler http.Handler, addr string, stop <-chan struct{}) error {
s := http.Server{
Handler: handler,
Addr: addr,
}
// 這個 goroutine 控制退出,因為 stop channel 只要close 或者是寫入數(shù)據(jù),這里就會退出
go func() {
// 無緩沖channel等待,寫入或者關(guān)閉
<-stop
log.Printf("server will exiting, addr: %s", addr)
// 此時 httpApi 服務(wù)就會優(yōu)雅的退出
err := s.Shutdown(context.Background())
if err != nil {
log.Printf("server exiting occur error, %s", err.Error())
}
}()
// 沒有觸發(fā)異常的化,會一直處于阻塞
return s.ListenAndServe()
}
上面代碼應(yīng)該還有問題,等日后再做優(yōu)化
第一種方法參考:reporter 添加shutdown方法
2. 總結(jié)
在使用go語言初期, 使用一個go關(guān)鍵字輕松開啟一個異步協(xié)程,再加上chan很容易實現(xiàn) 生產(chǎn)者---》消費者 設(shè)計模型,但是在使用過程中往往忽略了 程序退出時資源回收的問題,也很容易寫成一個數(shù)據(jù)使用一個go來處理,雖然官方說明了 創(chuàng)建一個goroutine的占用資源很小,但是再小的 占用空間也敵不過一個死循環(huán)啊。 所以在使用gorouine創(chuàng)建協(xié)程除了注意正確規(guī)定線程數(shù)以為,也要注意以下幾點。
- 將是否異步調(diào)用的選擇泉交給調(diào)用者, 不然很有可能使用者不知道所調(diào)用的函數(shù)立使用了
go - 如果要啟動一個
goroutine, 要對他負責(zé)
不用啟動一個無法控制他退出或者無法知道何時退出的goroutine
啟動goroutine時加上 panic recovery機制,避免服務(wù)直接不可用,可以使用如下代碼
// DeferRecover defer recover from panic.
func DeferRecover(tag string, handlePanic func(error)) func() {
return func() {
if err := recover(); err != nil {
log.Errorf("%s, recover from: %v\n%s\n", tag, err, debug.Stack())
if handlePanic != nil {
handlePanic(fmt.Errorf("%v", err))
}
}
}
}
// WithRecover recover from panic.
func WithRecover(tag string, f func(), handlePanic func(error)) {
defer DeferRecover(tag, handlePanic)()
f()
}
// Go is a wrapper of goroutine with recover.
func Go(name string, f func(), handlePanic func(error)) {
go WithRecover(fmt.Sprintf("goroutine %s", name), f, handlePanic)
}
- 造成 goroutine 泄漏的主要原因就是 goroutine 中造成了阻塞,并且沒有外部手段控制它退出
盡量避免在請求中直接啟動 goroutine 來處理問題,而應(yīng)該通過啟動 worker 來進行消費,這樣可以避免由于請求量過大,而導(dǎo)致大量創(chuàng)建 goroutine 從而導(dǎo)致 oom,當(dāng)然如果請求量本身非常小,那當(dāng)我沒說
3. 參考
https://dave.cheney.net/practical-go/presentations/qcon-china.html
https://lailin.xyz/post/go-training-week3-goroutine.html#總結(jié)
https://www.ardanlabs.com/blog/2019/04/concurrency-trap-2-incomplete-work.html
https://www.ardanlabs.com/blog/2014/01/concurrency-goroutines-and-gomaxprocs.html
到此這篇關(guān)于Go并發(fā)編程之正確使用goroutine的方法的文章就介紹到這了,更多相關(guān)Go并發(fā)編程goroutine內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- golang gin 框架 異步同步 goroutine 并發(fā)操作
- Go語言中的并發(fā)goroutine底層原理
- Go 并發(fā)編程Goroutine的實現(xiàn)示例
- Golang 語言控制并發(fā) Goroutine的方法
- golang并發(fā)編程中Goroutine 協(xié)程的實現(xiàn)
- Go中Goroutines輕量級并發(fā)的特性及效率探究
- 使用Go?goroutine實現(xiàn)并發(fā)的Clock服務(wù)
- Go語言使用goroutine及通道實現(xiàn)并發(fā)詳解
- Go 控制協(xié)程(goroutine)的并發(fā)數(shù)量
- Go語言使用Goroutine并發(fā)打印的項目實踐
相關(guān)文章
利用Golang解析json數(shù)據(jù)的方法示例
Go提供了原生的JSON庫,并且與語言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07
golang打包成帶圖標(biāo)的exe可執(zhí)行文件
這篇文章主要給大家介紹了關(guān)于golang打包成帶圖標(biāo)的exe可執(zhí)行文件的相關(guān)資料,文中通過實例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2023-06-06
深入學(xué)習(xí)Golang并發(fā)編程必備利器之sync.Cond類型
Go?語言的?sync?包提供了一系列同步原語,其中?sync.Cond?就是其中之一。本文將深入探討?sync.Cond?的實現(xiàn)原理和使用方法,幫助大家更好地理解和應(yīng)用?sync.Cond,需要的可以參考一下2023-05-05
利用golang的字符串解決leetcode翻轉(zhuǎn)字符串里的單詞
這篇文章主要介紹了利用golang的字符串解決leetcode翻轉(zhuǎn)字符串里的單詞,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
go語言實現(xiàn)的memcache協(xié)議服務(wù)的方法
這篇文章主要介紹了go語言實現(xiàn)的memcache協(xié)議服務(wù)的方法,實例分析了Go語言使用memcache的技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-03-03
深入理解Golang?make和new的區(qū)別及實現(xiàn)原理
在Go語言中,有兩個比較雷同的內(nèi)置函數(shù),分別是new和make方法,二者都可以用來分配內(nèi)存,那他們有什么區(qū)別呢?下面我們就從底層來分析一下二者的不同。感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助2022-10-10

