Go語言并發(fā)之通知退出機(jī)制的實(shí)現(xiàn)
1、通知退出機(jī)制
讀取已經(jīng)關(guān)閉的通道不會(huì)引起阻塞,也不會(huì)導(dǎo)致 panic,而是立即返回該通道存儲(chǔ)類型的零值。關(guān)閉 select 監(jiān)聽的某個(gè)通道能使 select 立即感知這種通知,然后進(jìn)行相應(yīng)的處理,這就是所謂的退出通知機(jī)制(close channel tobroadcast)。 context 標(biāo)準(zhǔn)庫就是利用這種機(jī)制處理更復(fù)雜的通知機(jī)制的,退出通知機(jī)制是學(xué)習(xí)使用 context庫的基礎(chǔ)。
下面通過一個(gè)隨機(jī)數(shù)生成器的示例演示退出通知機(jī)制,下游的消費(fèi)者不需要隨機(jī)數(shù)時(shí),顯式地通知生產(chǎn)者停止生產(chǎn)。
package main
import (
"fmt"
"math/rand"
"runtime"
)
//GenerateIntA是一個(gè)隨機(jī)數(shù)發(fā)生器
func GenerateIntA(done chan struct{}) chan int {
ch := make(chan int)
go func() {
Lable:
for {
select {
case ch <- rand.Int():
//增加一路監(jiān)聽,就是對(duì)退出通知信號(hào)done的監(jiān)聽
case <-done:
break Lable
}
}
//收到通知后關(guān)閉通道ch
close(ch)
}()
return ch
}
func main() {
done := make(chan struct{})
ch := GenerateIntA(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
// 發(fā)送通知,告訴生產(chǎn)者停止生產(chǎn)
close(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
//此時(shí)生產(chǎn)者已經(jīng)退出
println("NumGoroutine=", runtime.NumGoroutine())
}
# 程序結(jié)果
5577006791947779410
8674665223082153551
0 // 關(guān)閉通道會(huì)輸出0值
0
NumGoroutine= 1
goroutine是Go語言提供的語言級(jí)別的輕量級(jí)線程,在我們需要使用并發(fā)時(shí),我們只需要通過 go 關(guān)鍵字來開啟goroutine 即可。作為Go語言中的最大特色之一,goroutine在日常的工作學(xué)習(xí)中被大量使用著,但是對(duì)于它的調(diào)度處理,尤其是goroutine的退出時(shí)機(jī)和方式,很多小伙伴都沒有搞的很清楚,本文就來詳細(xì)講講Goroutine退出機(jī)制的原理及使用。
goroutine的調(diào)度是由 Golang 運(yùn)行時(shí)進(jìn)行管理的,同一個(gè)程序中的所有 goroutine 共享同一個(gè)地址空間,goroutine設(shè)計(jì)的退出機(jī)制是由goroutine自己退出,不能在外部強(qiáng)制結(jié)束一個(gè)正在執(zhí)行的goroutine(只有一種情況正在運(yùn)行的goroutine會(huì)因?yàn)槠渌鹓oroutine的結(jié)束被終止,就是main函數(shù)退出或程序停止執(zhí)行)。下面介紹幾種常用的退出方式。
1.1 進(jìn)程/main函數(shù)退出
1.1.1 kill進(jìn)程/進(jìn)程crash
當(dāng)進(jìn)程被強(qiáng)制退出,所有它占有的資源都會(huì)還給操作系統(tǒng),而goroutine作為進(jìn)程內(nèi)的線程,資源被收回了,那么
還未結(jié)束的goroutine也會(huì)直接退出。
1.1.2 main函數(shù)結(jié)束
同理,當(dāng)主函數(shù)結(jié)束,goroutine的資源也會(huì)被收回,直接退出。
package main
import (
"fmt"
"time"
)
func routineTest() {
time.Sleep(time.Second)
fmt.Println("I'm alive")
}
func main() {
fmt.Println("start test")
go routineTest()
fmt.Println("end test")
}
# 程序輸出
start test
end test
其中g(shù)o routine里需要print出來的語句是永遠(yuǎn)也不會(huì)出現(xiàn)的。
1.2 通過channel退出
通俗的講,就是各個(gè) goroutine 之間通信的"管道",有點(diǎn)類似于 Linux 中的管道。channel 是go最推薦的goroutine 間的通信方式,同時(shí)通過 channel 來通知 goroutine 退出也是最主要的goroutine退出方式。
goroutine 雖然不能強(qiáng)制結(jié)束另外一個(gè) goroutine,但是它可以通過 channel 通知另外一個(gè) goroutine 你的表演該結(jié)束了。
package main
import (
"fmt"
"time"
)
func cancelByChannel(quit <-chan time.Time) {
for {
select {
case <-quit:
fmt.Println("cancel goroutine by channel!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
quit := time.After(time.Second * 10)
go cancelByChannel(quit)
time.Sleep(15 * time.Second)
fmt.Println("I'm done")
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by channel!
I'm done
在該例子中,我們用時(shí)間定義了一個(gè)channel,當(dāng)10秒后,會(huì)給到goroutine一個(gè)退出信號(hào),然后go routine就會(huì)退出,這樣我們就實(shí)現(xiàn)了在其他線程中通知另一個(gè)線程退出的功能。
1.3 通過context退出
通過channel通知goroutine退出還有一個(gè)更好的方法就是使用context。沒錯(cuò),就是我們?cè)谌粘i_發(fā)中接口通用的第一個(gè)參數(shù)context。它本質(zhì)還是接收一個(gè)channel數(shù)據(jù),只是是通過ctx.Done()獲取。將上面的示例稍作修改即可。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go cancelByContext(ctx)
time.Sleep(10 * time.Second)
cancel()
time.Sleep(5 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
上面的 case 中,通過 context 自帶的 WithCancel 方法將 cancel 函數(shù)傳遞出來,然后手動(dòng)調(diào)用 cancel() 函數(shù)給goroutine 傳遞了 ctx.Done() 信號(hào)。context 也提供了 context.WithTimeout() 和context.WithDeadline() 方法來更方便的傳遞特定情況下的 Done 信號(hào)。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
go cancelByContext(ctx)
time.Sleep(15 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
上述 case 中使用了 context.WithTimeout() 來設(shè)置10秒后自動(dòng)退出,使用 context.WithDeadline() 的功能基本一樣。區(qū)別是 context.WithDeadline() 可以指定一個(gè)固定的時(shí)間點(diǎn),當(dāng)然也可以使用time.Now().Add(time.Second*10) 的方式來實(shí)現(xiàn)同 context.WithTimeout() 相同的功能。
package main
import (
"context"
"fmt"
"time"
)
func cancelByContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context!")
return
default:
fmt.Println("I'm alive")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
go cancelByContext(ctx)
time.Sleep(15 * time.Second)
}
# 程序輸出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!
注:這里需要注意的一點(diǎn)是上方兩個(gè)case中為了方便讀者理解,我將context傳回的cancel()函數(shù)拋棄掉了,實(shí)際使用中通常會(huì)加上 defer cancel() 來保證goroutine被殺死。
Context 使用原則和技巧:
不要把Context放在結(jié)構(gòu)體中,要以參數(shù)的方式傳遞,parent Context一般為Background應(yīng)該要把Context作為第一個(gè)參數(shù)傳遞給入口請(qǐng)求和出口請(qǐng)求鏈路上的每一個(gè)函數(shù),放在第一位,變量名建議都統(tǒng)一,如ctx。
給一個(gè)函數(shù)方法傳遞Context的時(shí)候,不要傳遞nil,否則在tarce追蹤的時(shí)候,就會(huì)斷了連接Context的Value相關(guān)方法應(yīng)該傳遞必須的數(shù)據(jù),不要什么數(shù)據(jù)都使用這個(gè)傳遞Context是線程安全的,可以放心的在多個(gè)goroutine中傳遞可以把一個(gè) Context 對(duì)象傳遞給任意個(gè)數(shù)的 gorotuine,對(duì)它執(zhí)行取消操作時(shí),所有 goroutine 都會(huì)接收到取消信號(hào)。
1.4 通過Panic退出
這是一種不推薦使用的方法?。?!在此給出只是提出這種操作的可能性。實(shí)際場(chǎng)景中尤其是生產(chǎn)環(huán)境請(qǐng)慎用??!
package main
import (
"context"
"fmt"
"time"
)
func cancelByPanic(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
fmt.Println("cancel goroutine by panic!")
}
}()
for i := 0; i < 5; i++ {
fmt.Println("hello cancelByPanic")
time.Sleep(1 * time.Second)
}
panic("panic")
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
go cancelByPanic(ctx)
time.Sleep(5 * time.Second)
}
# 程序輸出
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
這里我們通過在 defer 函數(shù)中使用 recover 來捕獲 panic error 并從 panic 中拿回控制權(quán),確保程序不會(huì)再panic 展開到 goroutine 調(diào)用棧頂部后崩潰。
2、阻止goroutine退出的方法
了解到goroutine的退出方式后,我們已經(jīng)可以解決一類問題。那就是當(dāng)你需要手動(dòng)控制某個(gè)goroutine結(jié)束的時(shí)
候應(yīng)該怎么辦。但是在實(shí)際生產(chǎn)中關(guān)于goroutine還有一類問題需要解決,那就是當(dāng)你的主進(jìn)程結(jié)束時(shí),應(yīng)該如何
等待goroutine全部執(zhí)行完畢后再使主進(jìn)程退出。
2.1 通過sync.WaitGroup
package main
import (
"fmt"
)
func main() {
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
go func(s string) {
fmt.Println(s)
}(v)
}
fmt.Println("End")
}
# 程序輸出
End
以上方的 case 為例,可見我們?cè)谑裁炊疾患拥臅r(shí)候,不會(huì)等待 go func 執(zhí)行完主程序就會(huì)退出。因此下面給出使
用 WaitGroup 的方法。
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個(gè) wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時(shí),通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
c
a
b
WaitGroup 可以理解為一個(gè) goroutine 管理者。他需要知道有多少個(gè) goroutine 在給他干活,并且在干完的時(shí)候
需要通知他干完了,否則他就會(huì)一直等,直到所有的小弟的活都干完為止。我們加上 WaitGroup 之后,程序會(huì)進(jìn)
行等待,直到它收到足夠數(shù)量的 Done() 信號(hào)為止。
WaitGroup 可被調(diào)用的方法只有三個(gè):Add() 、Done()、Wait()。
1、wg.Done() 函數(shù)實(shí)際上實(shí)現(xiàn)的是 wg.Add(-1),因此直接使用 wg.Add(-1) 是會(huì)造成同樣的結(jié)果的。在實(shí)際使
用中要注意避免誤操作,使得監(jiān)聽的 goroutine 數(shù)量出現(xiàn)誤差。
2、wg.Add() 函數(shù)可以一次性加n。但是實(shí)際使用時(shí)通常都設(shè)為1。但是wg本身的counter不能設(shè)為負(fù)數(shù)。假設(shè)你
在沒有Add到10以前,一次性 wg.Add(-10),會(huì)出現(xiàn)panic !
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個(gè) wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時(shí),通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
wg.Add(-10)
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
panic: sync: negative WaitGroup countergoroutine 1 [running]:
如果你的程序?qū)懙挠袉栴},出現(xiàn)了始終等待的 waitgroup 會(huì)造成死鎖。
package main
import (
"fmt"
"sync"
)
func main() {
// 定義 WaitGroup
var wg sync.WaitGroup
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
// 增加一個(gè) wait 任務(wù)
wg.Add(1)
go func(s string) {
// 函數(shù)結(jié)束時(shí),通知此 wait 任務(wù)已經(jīng)完成
defer wg.Done()
fmt.Println(s)
}(v)
}
wg.Add(1)
// 等待所有任務(wù)完成
wg.Wait()
}
# 程序輸出
c
a
b
fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:
2.2 通過channel
package main
import "fmt"
func main() {
arr := [3]string{"a", "b", "c"}
ch := make(chan struct{}, len(arr))
for _, v := range arr {
go func(s string) {
fmt.Println(s)
ch <- struct{}{}
}(v)
}
for i := 0; i < len(arr); i++ {
<-ch
}
}
# 程序輸出
c
a
b
需要注意的是,channel 同樣會(huì)導(dǎo)致死鎖。
package main
import "fmt"
func main() {
arr := [3]string{"a", "b", "c"}
ch := make(chan struct{}, len(arr))
for _, v := range arr {
go func(s string) {
fmt.Println(s)
ch <- struct{}{}
}(v)
}
for i := 0; i < len(arr); i++ {
<-ch
}
<-ch
}
# 程序輸出
c
a
b
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
2.3 封裝
利用 go routine 的這一特性,我們可以將 waitGroup 等方式封裝起來,保證 go routine 在主進(jìn)程結(jié)束時(shí)會(huì)繼續(xù)執(zhí)行完。
package main
import (
"fmt"
"sync"
)
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
wg.Add(1)
go func() {
f(args...)
wg.Done()
}()
}
func printArray(args ...interface{}) {
fmt.Println(args)
}
func main() {
// 定義 WaitGroup
var w WaitGroupWrapper
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
w.Wrap(printArray, v)
}
w.Wait()
}
# 程序輸出
[c]
[a]
[b]
還可以加上更高端一點(diǎn)的功能,增加時(shí)間、事件雙控制的 wrapper。
package main
import (
"fmt"
"sync"
"time"
)
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
wg.Add(1)
go func() {
f(args...)
wg.Done()
}()
}
func (w *WaitGroupWrapper) WaitWithTimeout(d time.Duration) bool {
ch := make(chan struct{})
t := time.NewTimer(d)
defer t.Stop()
go func() {
w.Wait()
ch <- struct{}{}
}()
select {
case <-ch:
fmt.Println("job is done!")
return true
case <-t.C:
fmt.Println("time is out!")
return false
}
}
func printArray(args ...interface{}) {
// 如果設(shè)置3秒,那么w.Wait()需要等待的時(shí)間是3秒,而超時(shí)時(shí)間的設(shè)置是2秒,所以會(huì)超時(shí)
//3秒后會(huì)觸發(fā)time is out分支
time.Sleep(1 * time.Second)
//如果改為time.Sleep(time.Second)即會(huì)觸發(fā)job is done分支
fmt.Println(args)
}
func main() {
// 定義 WaitGroup
var w WaitGroupWrapper
arr := [3]string{"a", "b", "c"}
for _, v := range arr {
w.Wrap(printArray, v)
}
w.WaitWithTimeout(2 * time.Second)
}
# 程序輸出
[b]
[a]
[c]
job is done!
到此這篇關(guān)于Go語言并發(fā)之通知退出機(jī)制的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Go 通知退出機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GO語言實(shí)現(xiàn)TCP服務(wù)器的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何通過GO語言實(shí)現(xiàn)TCP服務(wù)器,文中的示例代碼講解詳細(xì),對(duì)我們深入了解Go語言有一定的幫助,需要的可以參考一下2023-03-03
Gin golang web開發(fā)模型綁定實(shí)現(xiàn)過程解析
這篇文章主要介紹了Gin golang web開發(fā)模型綁定實(shí)現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
Go語言中一定要知道的切片使用注意事項(xiàng)總結(jié)
了解和掌握切片的使用注意事項(xiàng),可以避免意外的程序行為,所以本文就來和大家深入探討一下Go語言切片常見的注意事項(xiàng),希望對(duì)大家有所幫助2023-06-06
Go語言如何實(shí)現(xiàn)Benchmark函數(shù)
go想要在main函數(shù)中測(cè)試benchmark會(huì)麻煩一些,所以這篇文章主要為大家介紹了如何實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的且沒有開銷的benchmark函數(shù),希望對(duì)大家有所幫助2024-12-12
Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對(duì)比
這篇文章主要介紹了Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對(duì)比,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
GO語言字符串處理Strings包的函數(shù)使用示例講解
這篇文章主要為大家介紹了GO語言字符串處理Strings包的函數(shù)使用示例講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04

