欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go 控制協(xié)程(goroutine)的并發(fā)數量

 更新時間:2025年02月06日 09:26:17   作者:比豬聰明  
控制協(xié)程goroutine的并發(fā)數量是一個常見的需求,本文就來介紹一下Go 控制協(xié)程的并發(fā)數量,具有一定的參考價值,感興趣的可以了解一下

在使用協(xié)程并發(fā)處理某些任務時, 其并發(fā)數量往往因為各種因素的限制不能無限的增大. 例如網絡請求、數據庫查詢等等。

從運行效率角度考慮,在相關服務可以負載的前提下(限制最大并發(fā)數),盡可能高的并發(fā)。

在Go語言中,可以使用一些方法來控制協(xié)程(goroutine)的并發(fā)數量,以防止并發(fā)過多導致資源耗盡或性能下降

1、使用信號量(Semaphore)

可以使用 Go 語言中的 channel 來實現簡單的信號量,限制并發(fā)數量

package main

import (
	"fmt"
	"sync"
)

func worker(id int, sem chan struct{}) {
	sem <- struct{}{} // 占用一個信號量
	defer func() {
		<-sem // 方法運行結束釋放信號量
	}()

	// 執(zhí)行工作任務
	fmt.Printf("Worker %d: Working...\n", id)
}

func main() {
	concurrency := 3
	sem := make(chan struct{}, concurrency)
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, sem)
		}(i)
	}

	wg.Wait()
	close(sem)
}

sem 是一個有緩沖的 channel,通過控制 channel 中元素的數量,實現了一個簡單的信號量機制 

 2、使用協(xié)程池

可以創(chuàng)建一個固定數量的協(xié)程池,將任務分發(fā)給這些協(xié)程執(zhí)行。 

package main

import (
	"fmt"
	"sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	//jobs等待主要協(xié)程往jobs放數據
	for j := range jobs {
		fmt.Printf("協(xié)程池 %d: 協(xié)程池正在工作 %d\n", id, j)
		results <- j
	}
}

func main() {
	const numJobs = 5    //協(xié)程要做的工作數量
	const numWorkers = 3 //協(xié)程池數量

	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	var wg sync.WaitGroup

	// 啟動協(xié)程池
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, jobs, results)
		}(i)
	}

	// 提交任務
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}

	close(jobs)

	// 等待所有工作完成
	go func() {
		wg.Wait()
		close(results)
	}()

	// 處理結果
	for result := range results {
		fmt.Println("Result:", result)
	}
}

jobs 通道用于存儲任務,results 通道用于存儲處理結果。通過創(chuàng)建固定數量的工作協(xié)程,可以有效地控制并發(fā)數量。

3、使用其他包

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一個更為靈活的信號量實現。

案例一

限制對外部API的并發(fā)請求 

假設我們有一個外部API,它對并發(fā)請求有限制,我們希望確保不超過這個限制。我們可以使用semaphore.Weighted來控制對API的并發(fā)訪問

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/semaphore"
	"sync"
	"time"
)

func main() {
	/*
		1、在并發(fā)量一定的情況下,通過改變允許并發(fā)請求數可以更快處理請求任務(在CPU夠用的前提下)
		2、sem := semaphore.NewWeighted(n),參數n就是權重量
		3、當一個協(xié)程需要獲取的單位的權重越多,運行就會慢(比如權重總量n=5個,一個協(xié)程分配了2個,跟一個協(xié)程分配1個效率是不一樣的)
		4、信號量沒有足夠的可用權重的情況發(fā)生在所有已分配的權重單位都已經被占用,即信號量的當前權重計數達到了它的總容量。在這種情況下,任何嘗試通過Acquire方法獲取更多權重的調用都將無法立即完成,從而導致調用者(通常是goroutine)阻塞,直到其他調用者釋放一些權重單位。
	*/
	/*
		1、權權重較大的任務在資源競爭時有更高的優(yōu)先級,更容易獲得執(zhí)行的機會
		2、如果當前資源足夠滿足高權重任務的需求,這些任務將立即執(zhí)行;若資源不足,則按照權重高低順序排隊等待
		3、一旦任務開始執(zhí)行,其完成的速度主要取決于任務自身的邏輯復雜度、所需資源以及系統(tǒng)的當前負載等因素,與任務在信號量中的權重無關
		3、高權重的任務并不會中斷已經在執(zhí)行的低權重任務,而是等待這些任務自行釋放資源。一旦資源釋放,等待隊列中的高權重任務將優(yōu)先被喚醒
		4、Acquire 方法會檢查當前信號量的可用資源量是否滿足請求的權重,如果滿足,則立即減少信號量的資源計數并返回,允許任務繼續(xù)執(zhí)行。如果不滿足,任務將阻塞等待,直到有足夠的資源被釋放
	*/
	// 記錄開始時間
	startTime := time.Now()
	// 假設外部API允許的最大并發(fā)請求為5(信號量的總容量是5個權重單位)
	const (
		maxConcurrentRequests = 5
	)
	sem := semaphore.NewWeighted(maxConcurrentRequests)

	var wg sync.WaitGroup

	// 模擬對外部API的10個并發(fā)請求
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(requestId int) {
			defer wg.Done()
			// 假設我們想要獲取2個單位的權重
			if err := sem.Acquire(context.Background(), 2); err != nil {
				fmt.Printf("請求 %d 無法獲取信號量: %v\n", requestId, err)
				return
			}
			defer sem.Release(2) // 請求完成后釋放信號量

			// 模擬對API的請求處理
			fmt.Printf("請求 %d 開始...\n", requestId)
			time.Sleep(2 * time.Second) // 模擬網絡延遲
			fmt.Printf("請求 %d 完成。\n", requestId)
		}(i)
	}
	wg.Wait()
	// 記錄結束時間
	endTime := time.Now()
	// 計算并打印總耗時
	fmt.Printf("程序總耗時: %v\n", endTime.Sub(startTime))
}

信號量沒有足夠的可用權重的情況發(fā)生在所有已分配的權重單位都已經被占用,即信號量的當前權重計數達到了它的總容量。在這種情況下,任何嘗試通過Acquire方法獲取更多權重的調用都將無法立即完成,從而導致調用者(通常是goroutine)阻塞,直到其他調用者釋放一些權重單位。

以下是一些導致信號量沒有足夠可用權重的具體情況:

  • 信號量初始化容量較小:如果信號量的總容量設置得較小,而并發(fā)請求的數量較大,那么很快就會出現權重不足的情況。

  • 長時間占用權重:如果某些goroutine長時間占用權重單位而不釋放,這會導致其他goroutine無法獲取到權重,即使這些goroutine只是少數。

  • 權重分配不均:在某些情況下,可能存在一些goroutine占用了不成比例的權重單位,導致其他goroutine無法獲取足夠的權重。

  • 權重釋放不及時:如果goroutine因為錯誤或異常情況提前退出,而沒有正確釋放它們所占用的權重,那么這些權重單位將不會被回收到信號量中。

  • 高頻率的請求:在短時間內有大量goroutine請求權重,即使它們請求的權重不大,累積起來也可能超過信號量的總容量。

  • 信號量權重未正確管理:如果信號量的權重管理邏輯存在缺陷,例如錯誤地釋放了過多的權重,或者在錯誤的時間點釋放權重,也可能導致可用權重不足。

為了避免信號量沒有足夠的可用權重,可以采取以下措施:

  • 合理設置信號量容量:根據資源限制和并發(fā)需求合理設置信號量的總容量。
  • 及時釋放權重:確保在goroutine完成工作后及時釋放權重。
  • 使用超時:在Acquire調用中使用超時,避免無限期地等待權重。
  • 監(jiān)控和日志記錄:監(jiān)控信號量的使用情況,并記錄關鍵信息,以便及時發(fā)現和解決問題。
  • 權重分配策略:設計合理的權重分配策略,確保權重的公平和高效分配。

通過這些措施,可以更好地管理信號量的使用,避免因權重不足導致的并發(fā)問題。

案例二

假設有一個在線視頻平臺,它需要處理不同分辨率的視頻轉碼任務。由于高清視頻轉碼比標清視頻更消耗計算資源,因此平臺希望設計一個系統(tǒng),能夠優(yōu)先處理更多標清視頻轉碼請求,同時又不完全阻塞高清視頻的轉碼,以保持整體服務質量和資源的有效利用。

package main

import (
	"fmt"
	"golang.org/x/net/context"
	"golang.org/x/sync/semaphore"
	"runtime"
	"sync"
	"time"
)

// VideoTranscodeJob 視頻轉碼任務
type VideoTranscodeJob struct {
	resolution string
	weight     int64
}

func main() {
	cpuCount := runtime.NumCPU()
	fmt.Printf("當前CPU個數%v\n", cpuCount)
	/*
		1、權權重較大的任務在資源競爭時有更高的優(yōu)先級,更容易獲得執(zhí)行的機會
		2、如果當前資源足夠滿足高權重任務的需求,這些任務將立即執(zhí)行;若資源不足,則按照權重高低順序排隊等待
		3、一旦任務開始執(zhí)行,其完成的速度主要取決于任務自身的邏輯復雜度、所需資源以及系統(tǒng)的當前負載等因素,與任務在信號量中的權重無關
		3、高權重的任務并不會中斷已經在執(zhí)行的低權重任務,而是等待這些任務自行釋放資源。一旦資源釋放,等待隊列中的高權重任務將優(yōu)先被喚醒
		4、Acquire 方法會檢查當前信號量的可用資源量是否滿足請求的權重,如果滿足,則立即減少信號量的資源計數并返回,允許任務繼續(xù)執(zhí)行。如果不滿足,任務將阻塞等待,直到有足夠的資源被釋放
	*/
	// 初始化兩個信號量,一個用于標清,一個用于高清,假設總共有8個CPU核心可用
	normalSem := semaphore.NewWeighted(6)  // 標清任務,分配6個單位權重,因為它們消耗資源較少
	highDefSem := semaphore.NewWeighted(2) // 高清任務,分配2個單位權重,因為它們更消耗資源

	var wg sync.WaitGroup

	//假設有20個需要轉碼的視頻
	videoJobs := []VideoTranscodeJob{
		{"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},
		{"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},
		{"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},
		{"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},
	}

	for _, job := range videoJobs {
		wg.Add(1)
		go func(job VideoTranscodeJob) {
			defer wg.Done()

			var sem *semaphore.Weighted
			switch job.resolution {
			case "SD":
				sem = normalSem //分配權重大,當前為6,任務在獲取執(zhí)行機會上有優(yōu)勢,但并不直接意味著執(zhí)行速度快
			case "HD":
				sem = highDefSem
			default:
				panic("無效的分辨率")
			}

			if err := sem.Acquire(context.Background(), job.weight); err != nil {
				fmt.Printf("名為 %s 視頻無法獲取信號量: %v\n", job.resolution, err)
				return
			}
			defer sem.Release(job.weight) //釋放權重對應的信號量

			// 模擬轉碼任務執(zhí)行
			fmt.Printf("轉碼 %s 視頻 (權重: %d)...\n", job.resolution, job.weight)
			//通過利用VideoTranscodeJob的weight值來模擬轉碼時間的長短,HD用時長則設置2比SD的1大,*時間就自然長,運行就時間長
			time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模擬不同分辨率視頻轉碼所需時間
			fmt.Printf("------------------------%s 視頻轉碼完成。。。\n", job.resolution)
		}(job)
	}
	wg.Wait()
}

標清(SD)和高清(HD),分別分配了不同的權重(1和2)。通過創(chuàng)建兩個不同權重的信號量,我們可以控制不同類型任務的同時執(zhí)行數量,從而優(yōu)先保證標清視頻的快速處理,同時也確保高清視頻能夠在不影響系統(tǒng)穩(wěn)定性的情況下進行轉碼。這展示了帶權重的并發(fā)控制如何幫助在資源有限的情況下優(yōu)化任務調度和執(zhí)行效率。

 注意:對協(xié)程分配的權重單位數不能大于對應上下文semaphore.NewWeighted(n)中參數n的單位數

案例三

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

type weightedTask struct {
	id     int
	weight int64
}

func main() {
	const (
		maxTotalWeight = 20 // 最大總權重
	)
	sem := semaphore.NewWeighted(maxTotalWeight)

	var wg sync.WaitGroup
	tasksCh := make(chan weightedTask, 10)

	// 發(fā)送任務
	for i := 1; i <= 10; i++ {
		tasksCh <- weightedTask{id: i, weight: int64(i)} // 假設任務ID即為其權重
	}

	close(tasksCh)

	// 啟動任務處理器
	for task := range tasksCh {
		wg.Add(1)
		go func(task weightedTask) {
			defer wg.Done()

			if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {
				fmt.Printf("任務 %d 無法獲取信號量: %v\n", task.id, err)
				return
			}
			defer sem.Release(int64(task.id)) //釋放

			// 模擬任務執(zhí)行
			fmt.Printf("任務 %d (權重: %d) 正在運行...\n", task.id, task.weight)
			time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中簡單用時間模擬權重影響
			fmt.Printf("任務 %d 完成.\n", task.id)
		}(task)
	}
	wg.Wait()
}

總結

選擇哪種方法取決于具體的應用場景和需求。使用信號量是一種簡單而靈活的方法,而協(xié)程池則更適用于需要批量處理任務的情況。golang.org/x/sync/semaphore 包提供了一個標準庫外的更靈活的信號量實現  

到此這篇關于Go 控制協(xié)程(goroutine)的并發(fā)數量的文章就介紹到這了,更多相關Go 控制協(xié)程并發(fā)數量內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • go嵌套匿名結構體的初始化詳解

    go嵌套匿名結構體的初始化詳解

    這篇文章主要介紹了go嵌套匿名結構體的初始化詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • go日志系統(tǒng)logrus顯示文件和行號的操作

    go日志系統(tǒng)logrus顯示文件和行號的操作

    這篇文章主要介紹了go日志系統(tǒng)logrus顯示文件和行號的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • golang使用信號量熱更新的實現示例

    golang使用信號量熱更新的實現示例

    這篇文章主要介紹了golang使用信號量熱更新的實現示例,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • Gorm存在時更新,不存在時創(chuàng)建的問題

    Gorm存在時更新,不存在時創(chuàng)建的問題

    這篇文章主要介紹了Gorm存在時更新,不存在時創(chuàng)建的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • go HTTP2 的頭部壓縮算法hpack實現詳解

    go HTTP2 的頭部壓縮算法hpack實現詳解

    這篇文章主要為大家介紹了go HTTP2 的頭部壓縮算法hpack實現詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • golang 監(jiān)聽服務的信號,實現平滑啟動,linux信號說明詳解

    golang 監(jiān)聽服務的信號,實現平滑啟動,linux信號說明詳解

    這篇文章主要介紹了golang 監(jiān)聽服務的信號,實現平滑啟動,linux信號說明詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • 使用?gomonkey?Mock?函數及方法示例詳解

    使用?gomonkey?Mock?函數及方法示例詳解

    在 Golang 語言中,寫單元測試的時候,不可避免的會涉及到對其他函數及方法的 Mock,即在假設其他函數及方法響應預期結果的同時,校驗被測函數的響應是否符合預期,這篇文章主要介紹了使用?gomonkey?Mock?函數及方法,需要的朋友可以參考下
    2022-06-06
  • Go語言掃描目錄并獲取相關信息的方法

    Go語言掃描目錄并獲取相關信息的方法

    這篇文章主要介紹了Go語言掃描目錄并獲取相關信息的方法,實例分析了Go語言操作目錄及文件的技巧,需要的朋友可以參考下
    2015-03-03
  • Go中sync?包Cond使用場景分析

    Go中sync?包Cond使用場景分析

    Cond?是和某個條件相關,在條件還沒有滿足的時候,所有等待這個條件的協(xié)程都會被阻塞住,只有這個條件滿足的時候,等待的協(xié)程才可能繼續(xù)進行下去,這篇文章主要介紹了Go中sync?包的Cond使用場景分析,需要的朋友可以參考下
    2023-03-03
  • Golang設計模式中的橋接模式詳細講解

    Golang設計模式中的橋接模式詳細講解

    橋接模式是一種結構型設計模式,通過橋接模式可以將抽象部分和它的實現部分分離,本文主要介紹了GoLang橋接模式,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2023-01-01

最新評論