Go語言并發(fā)之通知退出機制的實現(xiàn)
1、通知退出機制
讀取已經(jīng)關(guān)閉的通道不會引起阻塞,也不會導(dǎo)致 panic,而是立即返回該通道存儲類型的零值。關(guān)閉 select 監(jiān)聽的某個通道能使 select 立即感知這種通知,然后進(jìn)行相應(yīng)的處理,這就是所謂的退出通知機制(close channel tobroadcast)。 context 標(biāo)準(zhǔn)庫就是利用這種機制處理更復(fù)雜的通知機制的,退出通知機制是學(xué)習(xí)使用 context庫的基礎(chǔ)。
下面通過一個隨機數(shù)生成器的示例演示退出通知機制,下游的消費者不需要隨機數(shù)時,顯式地通知生產(chǎn)者停止生產(chǎn)。
package main
import (
"fmt"
"math/rand"
"runtime"
)
//GenerateIntA是一個隨機數(shù)發(fā)生器
func GenerateIntA(done chan struct{}) chan int {
ch := make(chan int)
go func() {
Lable:
for {
select {
case ch <- rand.Int():
//增加一路監(jiān)聽,就是對退出通知信號done的監(jiān)聽
case <-done:
break Lable
}
}
//收到通知后關(guān)閉通道ch
close(ch)
}()
return ch
}
func main() {
done := make(chan struct{})
ch := GenerateIntA(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
// 發(fā)送通知,告訴生產(chǎn)者停止生產(chǎn)
close(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
//此時生產(chǎn)者已經(jīng)退出
println("NumGoroutine=", runtime.NumGoroutine())
}
# 程序結(jié)果
5577006791947779410
8674665223082153551
0 // 關(guān)閉通道會輸出0值
0
NumGoroutine= 1
goroutine是Go語言提供的語言級別的輕量級線程,在我們需要使用并發(fā)時,我們只需要通過 go 關(guān)鍵字來開啟goroutine 即可。作為Go語言中的最大特色之一,goroutine在日常的工作學(xué)習(xí)中被大量使用著,但是對于它的調(diào)度處理,尤其是goroutine的退出時機和方式,很多小伙伴都沒有搞的很清楚,本文就來詳細(xì)講講Goroutine退出機制的原理及使用。
goroutine的調(diào)度是由 Golang 運行時進(jìn)行管理的,同一個程序中的所有 goroutine 共享同一個地址空間,goroutine設(shè)計的退出機制是由goroutine自己退出,不能在外部強制結(jié)束一個正在執(zhí)行的goroutine(只有一種情況正在運行的goroutine會因為其他goroutine的結(jié)束被終止,就是main函數(shù)退出或程序停止執(zhí)行)。下面介紹幾種常用的退出方式。
1.1 進(jìn)程/main函數(shù)退出
1.1.1 kill進(jìn)程/進(jìn)程crash
當(dāng)進(jìn)程被強制退出,所有它占有的資源都會還給操作系統(tǒng),而goroutine作為進(jìn)程內(nèi)的線程,資源被收回了,那么
還未結(jié)束的goroutine也會直接退出。
1.1.2 main函數(shù)結(jié)束
同理,當(dāng)主函數(shù)結(jié)束,goroutine的資源也會被收回,直接退出。
package main
import (
"fmt"
"time"
)
func routineTest() {
time.Sleep(time.Second)
fmt.Println("I'm alive")
}
func main() {
fmt.Println("start test")
go routineTest()
fmt.Println("end test")
}
# 程序輸出
start test
end test
其中g(shù)o routine里需要print出來的語句是永遠(yuǎn)也不會出現(xiàn)的。
1.2 通過channel退出
通俗的講,就是各個 goroutine 之間通信的"管道",有點類似于 Linux 中的管道。channel 是go最推薦的goroutine 間的通信方式,同時通過 channel 來通知 goroutine 退出也是最主要的goroutine退出方式。
goroutine 雖然不能強制結(jié)束另外一個 goroutine,但是它可以通過 channel 通知另外一個 goroutine 你的表演該結(jié)束了。
package main
import (
"fmt"
"time"
)
func cancelByChannel(quit <-chan time.Time) {
for {
select {
case <-quit:
fmt.Println("cancel goroutine by channel!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
quit := time.After(time.Second * 10)
go cancelByChannel(quit)
time.Sleep(15 * time.Second)
fmt.Println("I'm done")
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by channel!
I'm done
在該例子中,我們用時間定義了一個channel,當(dāng)10秒后,會給到goroutine一個退出信號,然后go routine就會退出,這樣我們就實現(xiàn)了在其他線程中通知另一個線程退出的功能。
1.3 通過context退出
通過channel通知goroutine退出還有一個更好的方法就是使用context。沒錯,就是我們在日常開發(fā)中接口通用的第一個參數(shù)context。它本質(zhì)還是接收一個channel數(shù)據(jù),只是是通過ctx.Done()獲取。將上面的示例稍作修改即可。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go cancelByContext(ctx)
time.Sleep(10 * time.Second)
cancel()
time.Sleep(5 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
上面的 case 中,通過 context 自帶的 WithCancel 方法將 cancel 函數(shù)傳遞出來,然后手動調(diào)用 cancel() 函數(shù)給goroutine 傳遞了 ctx.Done() 信號。context 也提供了 context.WithTimeout() 和context.WithDeadline() 方法來更方便的傳遞特定情況下的 Done 信號。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
go cancelByContext(ctx)
time.Sleep(15 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
上述 case 中使用了 context.WithTimeout() 來設(shè)置10秒后自動退出,使用 context.WithDeadline() 的功能基本一樣。區(qū)別是 context.WithDeadline() 可以指定一個固定的時間點,當(dāng)然也可以使用time.Now().Add(time.Second*10) 的方式來實現(xiàn)同 context.WithTimeout() 相同的功能。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
go cancelByContext(ctx)
time.Sleep(15 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
注:這里需要注意的一點是上方兩個case中為了方便讀者理解,我將context傳回的cancel()函數(shù)拋棄掉了,實際使用中通常會加上 defer cancel() 來保證goroutine被殺死。
Context 使用原則和技巧:
不要把Context放在結(jié)構(gòu)體中,要以參數(shù)的方式傳遞,parent Context一般為Background應(yīng)該要把Context作為第一個參數(shù)傳遞給入口請求和出口請求鏈路上的每一個函數(shù),放在第一位,變量名建議都統(tǒng)一,如ctx。
給一個函數(shù)方法傳遞Context的時候,不要傳遞nil,否則在tarce追蹤的時候,就會斷了連接Context的Value相關(guān)方法應(yīng)該傳遞必須的數(shù)據(jù),不要什么數(shù)據(jù)都使用這個傳遞Context是線程安全的,可以放心的在多個goroutine中傳遞可以把一個 Context 對象傳遞給任意個數(shù)的 gorotuine,對它執(zhí)行取消操作時,所有 goroutine 都會接收到取消信號。
1.4 通過Panic退出
這是一種不推薦使用的方法?。?!在此給出只是提出這種操作的可能性。實際場景中尤其是生產(chǎn)環(huán)境請慎用?。?/p>
package main
import (
"context"
"fmt"
"time"
)
func cancelByPanic(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
fmt.Println("cancel goroutine by panic!")
}
}()
for i := 0; i < 5; i++ {
fmt.Println("hello cancelByPanic")
time.Sleep(1 * time.Second)
}
panic("panic")
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
go cancelByPanic(ctx)
time.Sleep(5 * time.Second)
}
# 程序輸出
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
這里我們通過在 defer 函數(shù)中使用 recover 來捕獲 panic error 并從 panic 中拿回控制權(quán),確保程序不會再panic 展開到 goroutine 調(diào)用棧頂部后崩潰。
2、阻止goroutine退出的方法
了解到goroutine的退出方式后,我們已經(jīng)可以解決一類問題。那就是當(dāng)你需要手動控制某個goroutine結(jié)束的時
候應(yīng)該怎么辦。但是在實際生產(chǎn)中關(guān)于goroutine還有一類問題需要解決,那就是當(dāng)你的主進(jìn)程結(jié)束時,應(yīng)該如何
等待goroutine全部執(zhí)行完畢后再使主進(jìn)程退出。
2.1 通過sync.WaitGroup
package main
import (
"fmt"
)
func main() {
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
go func(s string) {
fmt.Println(s)
}(v)
}
fmt.Println("End")
}
# 程序輸出
End
以上方的 case 為例,可見我們在什么都不加的時候,不會等待 go func 執(zhí)行完主程序就會退出。因此下面給出使
用 WaitGroup 的方法。
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個 wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時,通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
c
a
b
WaitGroup 可以理解為一個 goroutine 管理者。他需要知道有多少個 goroutine 在給他干活,并且在干完的時候
需要通知他干完了,否則他就會一直等,直到所有的小弟的活都干完為止。我們加上 WaitGroup 之后,程序會進(jìn)
行等待,直到它收到足夠數(shù)量的 Done() 信號為止。
WaitGroup 可被調(diào)用的方法只有三個:Add() 、Done()、Wait()。
1、wg.Done() 函數(shù)實際上實現(xiàn)的是 wg.Add(-1),因此直接使用 wg.Add(-1) 是會造成同樣的結(jié)果的。在實際使
用中要注意避免誤操作,使得監(jiān)聽的 goroutine 數(shù)量出現(xiàn)誤差。
2、wg.Add() 函數(shù)可以一次性加n。但是實際使用時通常都設(shè)為1。但是wg本身的counter不能設(shè)為負(fù)數(shù)。假設(shè)你
在沒有Add到10以前,一次性 wg.Add(-10),會出現(xiàn)panic !
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個 wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時,通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
wg.Add(-10)
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
panic: sync: negative WaitGroup countergoroutine 1 [running]:
如果你的程序?qū)懙挠袉栴},出現(xiàn)了始終等待的 waitgroup 會造成死鎖。
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個 wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時,通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
wg.Add(1)
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
c
a
b
fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:
2.2 通過channel
package main
import "fmt"
func main() {
arr := [3]string{"a", "b", "c"}
ch := make(chan struct{}, len(arr))
for _, v := range arr {
go func(s string) {
fmt.Println(s)
ch <- struct{}{}
}(v)
}
for i := 0; i < len(arr); i++ {
<-ch
}
}
# 程序輸出
c
a
b
需要注意的是,channel 同樣會導(dǎo)致死鎖。
package main
import "fmt"
func main() {
arr := [3]string{"a", "b", "c"}
ch := make(chan struct{}, len(arr))
for _, v := range arr {
go func(s string) {
fmt.Println(s)
ch <- struct{}{}
}(v)
}
for i := 0; i < len(arr); i++ {
<-ch
}
<-ch
}
# 程序輸出
c
a
b
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
2.3 封裝
利用 go routine 的這一特性,我們可以將 waitGroup 等方式封裝起來,保證 go routine 在主進(jìn)程結(jié)束時會繼續(xù)執(zhí)行完。
package main
import (
"fmt"
"sync"
)
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
wg.Add(1)
go func() {
f(args...)
wg.Done()
}()
}
func printArray(args ...interface{}) {
fmt.Println(args)
}
func main() {
// 定義 WaitGroup
var w WaitGroupWrapper
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
w.Wrap(printArray, v)
}
w.Wait()
}
# 程序輸出
[c]
[a]
[b]
還可以加上更高端一點的功能,增加時間、事件雙控制的 wrapper。
package main
import (
"fmt"
"sync"
"time"
)
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
wg.Add(1)
go func() {
f(args...)
wg.Done()
}()
}
func (w *WaitGroupWrapper) WaitWithTimeout(d time.Duration) bool {
ch := make(chan struct{})
t := time.NewTimer(d)
defer t.Stop()
go func() {
w.Wait()
ch <- struct{}{}
}()
select {
case <-ch:
fmt.Println("job is done!")
return true
case <-t.C:
fmt.Println("time is out!")
return false
}
}
func printArray(args ...interface{}) {
// 如果設(shè)置3秒,那么w.Wait()需要等待的時間是3秒,而超時時間的設(shè)置是2秒,所以會超時
//3秒后會觸發(fā)time is out分支
time.Sleep(1 * time.Second)
//如果改為time.Sleep(time.Second)即會觸發(fā)job is done分支
fmt.Println(args)
}
func main() {
// 定義 WaitGroup
var w WaitGroupWrapper
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
w.Wrap(printArray, v)
}
w.WaitWithTimeout(2 * time.Second)
}
# 程序輸出
[b]
[a]
[c]
job is done!
到此這篇關(guān)于Go語言并發(fā)之通知退出機制的實現(xiàn)的文章就介紹到這了,更多相關(guān)Go 通知退出機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Gin golang web開發(fā)模型綁定實現(xiàn)過程解析
這篇文章主要介紹了Gin golang web開發(fā)模型綁定實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10
Go語言如何實現(xiàn)Benchmark函數(shù)
go想要在main函數(shù)中測試benchmark會麻煩一些,所以這篇文章主要為大家介紹了如何實現(xiàn)了一個簡單的且沒有開銷的benchmark函數(shù),希望對大家有所幫助2024-12-12
Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對比
這篇文章主要介紹了Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對比,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
GO語言字符串處理Strings包的函數(shù)使用示例講解
這篇文章主要為大家介紹了GO語言字符串處理Strings包的函數(shù)使用示例講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04

