Go使用Weighted實現(xiàn)資源管理
1. 簡介
本文將介紹 Go 語言中的 Weighted 并發(fā)原語,包括 Weighted 的基本使用方法、實現(xiàn)原理、使用注意事項等內容。能夠更好地理解和應用 Weighted 來實現(xiàn)資源的管理,從而提高程序的穩(wěn)定性。
2. 問題引入
在微服務架構中,我們的服務節(jié)點負責接收其他節(jié)點的請求,并提供相應的功能和數(shù)據(jù)。比如賬戶服務,其他服務需要獲取賬戶信息,都會通過rpc請求向賬戶服務發(fā)起請求。
這些服務節(jié)點通常以集群的方式部署在服務器上,用于處理大量的并發(fā)請求。每個服務器都有其處理能力的上限,超過該上限可能導致性能下降甚至崩潰。
在部署服務時,通常會評估服務的并發(fā)量,并為其分配適當?shù)馁Y源以處理預期的請求負載。然而,在微服務架構中,存在著上游服務請求下游服務的場景。如果上游服務在某些情況下沒有正確考慮并發(fā)量,或者由于某些異常情況導致大量請求發(fā)送給下游服務,那么下游服務可能面臨超過其處理能力的問題。這可能導致下游服務的響應時間增加,甚至無法正常處理請求,進而影響整個系統(tǒng)的穩(wěn)定性和可用性。下面用一個簡單的代碼來說明一下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
// 啟動下游服務,用于處理請求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 模擬下游服務的處理邏輯
// ...
// 完成請求處理后,從等待組中刪除一個等待
wg.Done()
})
// 啟動下游服務的 HTTP 服務器
http.ListenAndServe(":8080", nil)
}
這里啟動一個簡單的HTTP服務器,由其來模擬下游服務,來接收上游服務的請求。下面我們啟動一個簡單的程序,由其來模擬上游服務發(fā)送請求:
func main() {
// 創(chuàng)建一個等待組,用于等待所有請求完成
var wg sync.WaitGroup
// 模擬上游服務發(fā)送大量請求給下游服務
go func() {
for i := 0; i < 1000000; i++ {
wg.Add(1)
go sendRequest(&wg)
}
}()
// 等待所有請求完成
wg.Wait()
}
func sendRequest(wg *sync.WaitGroup) {
// 模擬上游服務發(fā)送請求給下游服務
resp, err := http.Get("http://localhost:8080/")
if err != nil {
fmt.Println("請求失敗:", err)
} else {
fmt.Println("請求成功:", resp.Status)
}
// 請求完成后,通知等待組
wg.Done()
}
這里,我們同時啟動了1000000個協(xié)程同時往HTTP服務器發(fā)送請求,如果服務器配置不夠高,亦或者是請求量更多的情況下,已經超過了服務器的處理上限,服務器沒有主夠的資源去處理這些請求,此時將有可能直接將服務器打掛掉,服務直接不可用。在這種情況下,如果由于上游服務的問題,導致下游服務,甚至整個鏈路的系統(tǒng)都直接崩潰,這個是不合理的,此時需要有一些手段保護下游服務由于異常流量導致整個系統(tǒng)的崩潰。
這里對上面的場景進行分析,可以發(fā)現(xiàn),此時是由于上游服務大量請求的過來,而當前服務并沒有足夠的資源去處理這些請求,但是并沒有對其加以限制,而是繼續(xù)處理,最終導致了整個系統(tǒng)的不可用。那么此時就應該進行限流,對并發(fā)請求量進行控制,對服務器能夠處理的并發(fā)數(shù)進行合理評估,當并發(fā)請求數(shù)超過了限制,此時應該直接拒絕其訪問,避免整個系統(tǒng)的不可用。
那問題來了,go語言中,有什么方法能夠實現(xiàn)資源的管理,如果沒有足夠的資源,此時將直接返回,不對請求進行處理呢?其實go語言中有Weighted類型,在這種場景還挺合適的。下面我們將對其進行介紹。
3. 基本使用
3.1 基本介紹
Weighted 是 Go 語言中 golang.org/x/sync包中的一種類型,用于限制并發(fā)訪問某個資源的數(shù)量。它提供了一種機制,允許調用者以不同的權重請求訪問資源,并在資源可用時進行授予。
Weighted的定義如下,提供了Acquire,TryAcquire,Release三個方法:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error{}
func (s *Weighted) TryAcquire(n int64) bool{}
func (s *Weighted) Release(n int64) {}- Acquire: 以權重 n 請求獲取資源,阻塞直到資源可用或上下文 ctx 結束。
- TryAcquire: 嘗試以權重 n 獲取信號量,如果成功則返回 true,否則返回 false,并保持信號量不變。
- Release:釋放具有權重 n 的信號量。
3.2 權重說明
有時候,不同請求對資源的消耗是不同的。通過設置權重,你可以更好地控制不同請求對資源的使用情況。例如,某些請求可能需要更多的計算資源或更長的處理時間,你可以設置較高的權重來確保它們能夠獲取到足夠的資源。
其次就是權重大只是代表著請求需要使用到的資源多,對于優(yōu)先級并不會有作用。在Weighted 中,資源的許可是以先進先出(FIFO)的順序分配的,而不是根據(jù)權重來決定獲取的優(yōu)先級。當有多個請求同時等待獲取資源時,它們會按照先后順序依次獲取資源的許可。
假設先請求權重為 1 的資源,然后再請求權重為 2 的資源。如果當前可用的資源許可足夠滿足兩個請求的總權重,那么先請求的權重為 1 的資源會先獲取到許可,然后是后續(xù)請求的權重為 2 的資源。
w.Acquire(context.Background(), 1) // 權重為 1 的請求先獲取到資源許可 w.Acquire(context.Background(), 2) // 權重為 2 的請求在權重為 1 的請求之后獲取到資源許可
3.3 基本使用
當使用Weighted來控制資源的并發(fā)訪問時,通常需要以下幾個步驟:
- 創(chuàng)建Weighted實例,定義好最大資源數(shù)
- 當需要資源時,調用Acquire方法占據(jù)資源
- 當處理完成之后,調用Release方法釋放資源
下面是一個簡單的代碼的示例,展示了如何使用Weighted實現(xiàn)資源控制:
func main() {
// 1. 創(chuàng)建一個信號量實例,設置最大并發(fā)數(shù)
sem := semaphore.NewWeighted(10)
// 具體處理請求的函數(shù)
handleRequest := func(id int) {
// 2. 調用Acquire嘗試獲取資源
err := sem.Acquire(context.Background(), 1)
if err != nil {
fmt.Printf("Goroutine %d failed to acquire resource\n", id)
}
// 3. 成功獲取資源,使用defer,在任務執(zhí)行完之后,自動釋放資源
defer sem.Release(1)
// 執(zhí)行業(yè)務邏輯
return
}
// 模擬并發(fā)請求
for i := 0; i < 20; i++ {
go handleRequest(i)
}
time.Sleep(20 * time.Second)
}首先,調用NewWeighted方法創(chuàng)建一個信號量實例,設置最大并發(fā)數(shù)為10。然后在每次請求處理前調用Acquire方法嘗試獲取資源,成功獲取資源后,使用defer關鍵字,在任務執(zhí)行完后自動釋放資源,調用Release方法釋放一個資源。
保證最多同時有10個協(xié)程獲取資源。如果有更多的協(xié)程嘗試獲取資源,它們會等待其他協(xié)程釋放資源后再進行獲取。
4. 實現(xiàn)原理
4.1 設計初衷
Weighted類型的設計初衷是為了在并發(fā)環(huán)境中實現(xiàn)對資源的控制和限制。它提供了一種簡單而有效的機制,允許在同一時間內只有一定數(shù)量的并發(fā)操作可以訪問或使用特定的資源。
4.2 基本原理
Weighted類型的基本實現(xiàn)原理是基于計數(shù)信號量的概念。計數(shù)信號量是一種用于控制并發(fā)訪問的同步原語,它維護一個可用資源的計數(shù)器。在Weighted中,該計數(shù)器表示可用的資源數(shù)量。
當一個任務需要獲取資源時,它會調用Acquire方法。該方法首先會檢查當前可用資源的數(shù)量,如果大于零,則表示有可用資源,并將計數(shù)器減一,任務獲取到資源,并繼續(xù)執(zhí)行。如果當前可用資源的數(shù)量為零,則任務會被阻塞,直到有其他任務釋放資源。
當一個任務完成對資源的使用后,它會調用Release方法來釋放資源。該方法會將計數(shù)器加一,表示資源已經可用,其他被阻塞的任務可以繼續(xù)獲取資源并執(zhí)行。
通過這種方式,Weighted實現(xiàn)了對資源的限制和控制。它確保在同一時間內只有一定數(shù)量的并發(fā)任務可以訪問資源,超過限制的任務會被阻塞,直到有其他任務釋放資源。這樣可以有效地避免資源過度使用和競爭,保證系統(tǒng)的穩(wěn)定性和性能。
4.3 代碼實現(xiàn)
4.3.1 結構體定義
Weighted的結構體定義如下:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}- size:表示資源的總數(shù)量,即可以同時獲取的最大資源數(shù)量。
- cur:表示當前已經被獲取的資源數(shù)量。
- mu:用于保護Weighted類型的互斥鎖,確保并發(fā)安全性。
- waiters:使用雙向鏈表來存儲等待獲取資源的任務。
4.3.2 Acquire方法
Acquire方法將獲取指定數(shù)量的資源。如果當前可用資源數(shù)量不足,調用此方法的任務將被阻塞,并加入到等待隊列中。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
// 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保并發(fā)安全性。
s.mu.Lock()
// size - cur 代表剩余可用資源數(shù),如果大于請求資源數(shù)n, 此時代表剩余可用資源 大于 需要的資源數(shù)
// 其次,Weighted資源分配的順序是FIFO,如果等待隊列不為空,當前請求就需要自動放到隊列最后面
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// s.size 代表最大資源數(shù),如果需要的資源數(shù) 大于 最大資源數(shù),此時直接返回錯誤
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 這里代表著當前暫時獲取不到資源,此時將創(chuàng)建一個waiter對象放到等待隊列最后
ready := make(chan struct{})
// waiter對象中包含需要獲取的資源數(shù)量n和通知通道ready。
w := waiter{n: n, ready: ready}
// 將waiter對象放到隊列最后
elem := s.waiters.PushBack(w)
// 釋放鎖,讓其他請求進來
s.mu.Unlock()
select {
// 如果ctx.Done()通道被關閉,表示上下文已取消,任務需要返回錯誤。
case <-ctx.Done():
err := ctx.Err()
// 新獲取鎖,檢查是否已經成功獲取資源。如果成功獲取資源,將錯誤置為nil,表示獲取成功;
s.mu.Lock()
select {
// 通過判斷ready channel是否接收到信號,從而來判斷是否成功獲取資源
case <-ready:
err = nil
default:
// 判斷是否是等待隊列中第一個元素
isFront := s.waiters.Front() == elem
// 將該請求從等待隊列中移除
s.waiters.Remove(elem)
// 如果是第一個等待對象,同時還有剩余資源,喚醒后面的waiter。說不定后面的waiter剛好符合條件
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// ready通道接收到數(shù)據(jù),代表此時已經成功占據(jù)到資源了
case <-ready:
return nil
}
}Weighted對象用來控制可用資源的數(shù)量。它有兩個重要的字段,cur和size,分別表示當前可用的資源數(shù)量和總共可用的資源數(shù)量。
當一個請求通過Acquire方法請求資源時,首先會檢查剩余資源數(shù)量是否足夠,并且等待隊列中沒有其他請求在等待資源。如果滿足這兩個條件,請求就可以成功獲取到資源。
如果剩余資源數(shù)量不足以滿足請求,那么一個waiter的對象會被創(chuàng)建并放入等待隊列中。waiter對象包含了請求需要的資源數(shù)量n和一個用于通知的通道ready。當其他請求調用Release方法釋放資源時,它們會檢查等待隊列中的waiter對象是否滿足資源需求,如果滿足,就會將資源分配給該waiter對象,并通過ready通道來通知它可以執(zhí)行業(yè)務邏輯了。
即使剩余資源數(shù)量大于請求所需數(shù)量,如果等待隊列中存在等待的請求,新的請求也會被放入等待隊列中,而不管資源是否足夠。這可能導致一些請求長時間等待資源,導致資源的浪費和延遲。因此,在使用Weighted進行資源控制時,需要謹慎評估資源配額,并避免資源饑餓的情況發(fā)生,以免影響系統(tǒng)的性能和響應能力。
4.3.3 Release方法
Release方法將釋放指定數(shù)量的資源。當資源被釋放時,會檢查等待隊列中的任務。它從隊頭開始逐個檢查等待的元素,并嘗試為它們分配資源,直到最后一個不滿足資源條件的元素為止。
func (s *Weighted) Release(n int64) {
// 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保并發(fā)安全性。
s.mu.Lock()
// 2. 釋放資源
s.cur -= n
// 3. 異常情況處理
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 4. 喚醒等待任務
s.notifyWaiters()
s.mu.Unlock()
}可以看到,Release方法實現(xiàn)相對比較簡單,釋放資源后,便直接調用notifyWaiters方法喚醒處于等待狀態(tài)的任務。下面來看看notifyWaiters方法的具體實現(xiàn):
func (s *Weighted) notifyWaiters() {
for {
// 獲取隊頭元素
next := s.waiters.Front()
// 已經沒有處于等待狀態(tài)的協(xié)程,此時直接返回
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
// 如果資源不滿足要求 當前waiter的要求,此時直接返回
if s.size-s.cur < w.n {
break
}
// 否則占據(jù)waiter需要的資源數(shù)
s.cur += w.n
// 移除等待元素
s.waiters.Remove(next)
// 喚醒處于等待狀態(tài)的任務,Acquire方法會 <- ready 來等待信號的到來
close(w.ready)
}
}notifyWaiters方法會從隊頭開始獲取元素,判斷當前資源的剩余數(shù),是否滿足waiter的要求,如果滿足的話,此時先占據(jù)該waiter需要的資源,之后再將其從等待隊列中移除,最后調用close方法,喚醒處于等待狀態(tài)的任務。 之后,再繼續(xù)隊列中取出元素,判斷是否滿足條件,循環(huán)反復,直到不滿足waiter的條件為止。
4.3.4 TryAcquire方法
TryAcquire方法將嘗試獲取指定數(shù)量的資源,但不會阻塞。如果可用資源不足,它會立即返回一個錯誤,而不是阻塞等待。實現(xiàn)比較簡單,只是簡單檢查當前資源數(shù)是否滿足要求而已,具體如下:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}5. 注意事項
5.1 及時釋放資源
當使用Weighted來管理資源時,確保在使用完資源后,及時調用Release方法釋放資源。如果不這樣做,將會導致資源泄漏,最終導致所有的請求都將無法被處理。下面展示一個簡單的代碼說明:
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
sem := semaphore.NewWeighted(5) // 創(chuàng)建一個最大并發(fā)數(shù)為5的Weighted實例
// 模擬使用資源的任務
task := func(id int) {
//1. 成功獲取資源
if err := sem.Acquire(context.Background(), 1); err != nil {
fmt.Printf("Task %d failed to acquire resource: %s\n", id, err)
return
}
// 2. 任務處理完成之后,資源沒有被釋放
// defer sem.Release(1) // 使用defer確保在任務完成后釋放資源
}
// 啟動多個任務并發(fā)執(zhí)行
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task(id)
}(i)
}
wg.Wait() // 等待所有任務完成
}在上面的代碼中,我們使用Weighted來控制最大并發(fā)數(shù)為5。我們在任務中沒有調用sem.Release(1)釋放資源,這些資源將一直被占用,后面啟動的5個任務將永遠無法獲取到資源,此時將永遠不會繼續(xù)執(zhí)行下去。因此,務必在使用完資源后及時調用Release方法釋放資源,以確保資源的正確回收和釋放,保證系統(tǒng)的穩(wěn)定性和性能。
而且這里最好使用defer語句來實現(xiàn)資源的釋放,避免Release函數(shù)在某些異常場景下無法被執(zhí)行到。
5.2 合理設置并發(fā)數(shù)
Weighted只是提供了一種管理資源的手段,具體的并發(fā)數(shù)還需要開發(fā)人員自行根據(jù)系統(tǒng)的實際需求和資源限制,合理設置Weighted實例的最大并發(fā)數(shù)。過大的并發(fā)數(shù)可能導致資源過度競爭,而過小的并發(fā)數(shù)可能限制了系統(tǒng)的吞吐量。
具體操作可以到線上預發(fā)布環(huán)境,不斷調整觀察,獲取到一個最合適的并發(fā)數(shù)。
5.3 考慮Weighted是否適用于當前場景
Weighted 類型可以用于限制并發(fā)訪問資源的數(shù)量,但它也存在一些潛在的缺點,需要根據(jù)具體的應用場景和需求權衡利弊。
首先是內存開銷,Weighted 類型使用一個 sync.Mutex 以及一個 list.List 來管理等待隊列,這可能會占用一定的內存開銷。對于大規(guī)模的并發(fā)處理,特別是在限制極高的情況下,可能會影響系統(tǒng)的內存消耗。
其次是Weighted 類型一旦初始化,最大并發(fā)數(shù)是固定的,無法在運行時動態(tài)調整。如果你的應用程序需要根據(jù)負載情況動態(tài)調整并發(fā)限制,可能需要使用其他機制或實現(xiàn)。
而且Weighted是嚴格按照FIFO請求順序來分配資源的,當某些請求的權重過大時,可能會導致其他請求饑餓,即長時間等待資源。
最后,則是由于 Weighted 類型使用了互斥鎖來保護共享狀態(tài),因此在高并發(fā)情況下,爭奪鎖可能成為性能瓶頸,影響系統(tǒng)的吞吐量。
因此,在使用 Weighted 類型時,需要根據(jù)具體的應用場景和需求權衡利弊,從而來決定是否使用Weighted來實現(xiàn)資源的管理控制。
6. 總結
本文介紹了一種解決系統(tǒng)中資源管理問題的解決方案Weighted。本文從問題引出,詳細介紹了Weighted的特點和使用方法。通過了解Weighted的設計初衷和實現(xiàn)原理,讀者可以更好地理解其工作原理。
同時,文章提供了使用Weighted時需要注意的事項,如及時釋放資源、合理設置并發(fā)數(shù)等,從而幫助讀者避免潛在的問題,以及能夠在比較合適的場景下使用到Weighted類型實現(xiàn)資源管理?;诖耍覀兺瓿闪藢eighted的介紹,希望對你有所幫助。
到此這篇關于Go使用Weighted實現(xiàn)資源管理的文章就介紹到這了,更多相關Go實現(xiàn)資源管理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
go?tool?pprof?參數(shù)?'-base'?和?'-diff_base&
這篇文章主要介紹了go?tool?pprof?參數(shù)?'-base'?和?'-diff_base'之間的區(qū)別,兩個參數(shù)都是用于計算當前?profile文件減去基準profile文件所獲得的差值,用這個差值生成一個新的profile文件,本文給大家介紹的非常詳細,需要的朋友可以參考下2023-05-05
golang?防緩存擊穿singleflight的實現(xiàn)
本文主要介紹了golang?防緩存擊穿singleflight的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-08-08
在Golang中使用http.FileServer返回靜態(tài)文件的操作
這篇文章主要介紹了在Golang中使用http.FileServer返回靜態(tài)文件的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12

