Go使用Weighted實現(xiàn)資源管理
1. 簡介
本文將介紹 Go 語言中的 Weighted 并發(fā)原語,包括 Weighted 的基本使用方法、實現(xiàn)原理、使用注意事項等內(nèi)容。能夠更好地理解和應(yīng)用 Weighted 來實現(xiàn)資源的管理,從而提高程序的穩(wěn)定性。
2. 問題引入
在微服務(wù)架構(gòu)中,我們的服務(wù)節(jié)點負責接收其他節(jié)點的請求,并提供相應(yīng)的功能和數(shù)據(jù)。比如賬戶服務(wù),其他服務(wù)需要獲取賬戶信息,都會通過rpc請求向賬戶服務(wù)發(fā)起請求。
這些服務(wù)節(jié)點通常以集群的方式部署在服務(wù)器上,用于處理大量的并發(fā)請求。每個服務(wù)器都有其處理能力的上限,超過該上限可能導致性能下降甚至崩潰。
在部署服務(wù)時,通常會評估服務(wù)的并發(fā)量,并為其分配適當?shù)馁Y源以處理預期的請求負載。然而,在微服務(wù)架構(gòu)中,存在著上游服務(wù)請求下游服務(wù)的場景。如果上游服務(wù)在某些情況下沒有正確考慮并發(fā)量,或者由于某些異常情況導致大量請求發(fā)送給下游服務(wù),那么下游服務(wù)可能面臨超過其處理能力的問題。這可能導致下游服務(wù)的響應(yīng)時間增加,甚至無法正常處理請求,進而影響整個系統(tǒng)的穩(wěn)定性和可用性。下面用一個簡單的代碼來說明一下:
package main import ( "fmt" "net/http" "sync" ) func main() { // 啟動下游服務(wù),用于處理請求 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { // 模擬下游服務(wù)的處理邏輯 // ... // 完成請求處理后,從等待組中刪除一個等待 wg.Done() }) // 啟動下游服務(wù)的 HTTP 服務(wù)器 http.ListenAndServe(":8080", nil) }
這里啟動一個簡單的HTTP服務(wù)器,由其來模擬下游服務(wù),來接收上游服務(wù)的請求。下面我們啟動一個簡單的程序,由其來模擬上游服務(wù)發(fā)送請求:
func main() { // 創(chuàng)建一個等待組,用于等待所有請求完成 var wg sync.WaitGroup // 模擬上游服務(wù)發(fā)送大量請求給下游服務(wù) go func() { for i := 0; i < 1000000; i++ { wg.Add(1) go sendRequest(&wg) } }() // 等待所有請求完成 wg.Wait() } func sendRequest(wg *sync.WaitGroup) { // 模擬上游服務(wù)發(fā)送請求給下游服務(wù) resp, err := http.Get("http://localhost:8080/") if err != nil { fmt.Println("請求失敗:", err) } else { fmt.Println("請求成功:", resp.Status) } // 請求完成后,通知等待組 wg.Done() }
這里,我們同時啟動了1000000個協(xié)程同時往HTTP服務(wù)器發(fā)送請求,如果服務(wù)器配置不夠高,亦或者是請求量更多的情況下,已經(jīng)超過了服務(wù)器的處理上限,服務(wù)器沒有主夠的資源去處理這些請求,此時將有可能直接將服務(wù)器打掛掉,服務(wù)直接不可用。在這種情況下,如果由于上游服務(wù)的問題,導致下游服務(wù),甚至整個鏈路的系統(tǒng)都直接崩潰,這個是不合理的,此時需要有一些手段保護下游服務(wù)由于異常流量導致整個系統(tǒng)的崩潰。
這里對上面的場景進行分析,可以發(fā)現(xiàn),此時是由于上游服務(wù)大量請求的過來,而當前服務(wù)并沒有足夠的資源去處理這些請求,但是并沒有對其加以限制,而是繼續(xù)處理,最終導致了整個系統(tǒng)的不可用。那么此時就應(yīng)該進行限流,對并發(fā)請求量進行控制,對服務(wù)器能夠處理的并發(fā)數(shù)進行合理評估,當并發(fā)請求數(shù)超過了限制,此時應(yīng)該直接拒絕其訪問,避免整個系統(tǒng)的不可用。
那問題來了,go語言中,有什么方法能夠?qū)崿F(xiàn)資源的管理,如果沒有足夠的資源,此時將直接返回,不對請求進行處理呢?其實go語言中有Weighted類型,在這種場景還挺合適的。下面我們將對其進行介紹。
3. 基本使用
3.1 基本介紹
Weighted 是 Go 語言中 golang.org/x/sync包中的一種類型,用于限制并發(fā)訪問某個資源的數(shù)量。它提供了一種機制,允許調(diào)用者以不同的權(quán)重請求訪問資源,并在資源可用時進行授予。
Weighted的定義如下,提供了Acquire,TryAcquire,Release三個方法:
type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List } func (s *Weighted) Acquire(ctx context.Context, n int64) error{} func (s *Weighted) TryAcquire(n int64) bool{} func (s *Weighted) Release(n int64) {}
- Acquire: 以權(quán)重 n 請求獲取資源,阻塞直到資源可用或上下文 ctx 結(jié)束。
- TryAcquire: 嘗試以權(quán)重 n 獲取信號量,如果成功則返回 true,否則返回 false,并保持信號量不變。
- Release:釋放具有權(quán)重 n 的信號量。
3.2 權(quán)重說明
有時候,不同請求對資源的消耗是不同的。通過設(shè)置權(quán)重,你可以更好地控制不同請求對資源的使用情況。例如,某些請求可能需要更多的計算資源或更長的處理時間,你可以設(shè)置較高的權(quán)重來確保它們能夠獲取到足夠的資源。
其次就是權(quán)重大只是代表著請求需要使用到的資源多,對于優(yōu)先級并不會有作用。在Weighted 中,資源的許可是以先進先出(FIFO)的順序分配的,而不是根據(jù)權(quán)重來決定獲取的優(yōu)先級。當有多個請求同時等待獲取資源時,它們會按照先后順序依次獲取資源的許可。
假設(shè)先請求權(quán)重為 1 的資源,然后再請求權(quán)重為 2 的資源。如果當前可用的資源許可足夠滿足兩個請求的總權(quán)重,那么先請求的權(quán)重為 1 的資源會先獲取到許可,然后是后續(xù)請求的權(quán)重為 2 的資源。
w.Acquire(context.Background(), 1) // 權(quán)重為 1 的請求先獲取到資源許可 w.Acquire(context.Background(), 2) // 權(quán)重為 2 的請求在權(quán)重為 1 的請求之后獲取到資源許可
3.3 基本使用
當使用Weighted來控制資源的并發(fā)訪問時,通常需要以下幾個步驟:
- 創(chuàng)建Weighted實例,定義好最大資源數(shù)
- 當需要資源時,調(diào)用Acquire方法占據(jù)資源
- 當處理完成之后,調(diào)用Release方法釋放資源
下面是一個簡單的代碼的示例,展示了如何使用Weighted實現(xiàn)資源控制:
func main() { // 1. 創(chuàng)建一個信號量實例,設(shè)置最大并發(fā)數(shù) sem := semaphore.NewWeighted(10) // 具體處理請求的函數(shù) handleRequest := func(id int) { // 2. 調(diào)用Acquire嘗試獲取資源 err := sem.Acquire(context.Background(), 1) if err != nil { fmt.Printf("Goroutine %d failed to acquire resource\n", id) } // 3. 成功獲取資源,使用defer,在任務(wù)執(zhí)行完之后,自動釋放資源 defer sem.Release(1) // 執(zhí)行業(yè)務(wù)邏輯 return } // 模擬并發(fā)請求 for i := 0; i < 20; i++ { go handleRequest(i) } time.Sleep(20 * time.Second) }
首先,調(diào)用NewWeighted方法創(chuàng)建一個信號量實例,設(shè)置最大并發(fā)數(shù)為10。然后在每次請求處理前調(diào)用Acquire方法嘗試獲取資源,成功獲取資源后,使用defer關(guān)鍵字,在任務(wù)執(zhí)行完后自動釋放資源,調(diào)用Release方法釋放一個資源。
保證最多同時有10個協(xié)程獲取資源。如果有更多的協(xié)程嘗試獲取資源,它們會等待其他協(xié)程釋放資源后再進行獲取。
4. 實現(xiàn)原理
4.1 設(shè)計初衷
Weighted類型的設(shè)計初衷是為了在并發(fā)環(huán)境中實現(xiàn)對資源的控制和限制。它提供了一種簡單而有效的機制,允許在同一時間內(nèi)只有一定數(shù)量的并發(fā)操作可以訪問或使用特定的資源。
4.2 基本原理
Weighted類型的基本實現(xiàn)原理是基于計數(shù)信號量的概念。計數(shù)信號量是一種用于控制并發(fā)訪問的同步原語,它維護一個可用資源的計數(shù)器。在Weighted中,該計數(shù)器表示可用的資源數(shù)量。
當一個任務(wù)需要獲取資源時,它會調(diào)用Acquire方法。該方法首先會檢查當前可用資源的數(shù)量,如果大于零,則表示有可用資源,并將計數(shù)器減一,任務(wù)獲取到資源,并繼續(xù)執(zhí)行。如果當前可用資源的數(shù)量為零,則任務(wù)會被阻塞,直到有其他任務(wù)釋放資源。
當一個任務(wù)完成對資源的使用后,它會調(diào)用Release方法來釋放資源。該方法會將計數(shù)器加一,表示資源已經(jīng)可用,其他被阻塞的任務(wù)可以繼續(xù)獲取資源并執(zhí)行。
通過這種方式,Weighted實現(xiàn)了對資源的限制和控制。它確保在同一時間內(nèi)只有一定數(shù)量的并發(fā)任務(wù)可以訪問資源,超過限制的任務(wù)會被阻塞,直到有其他任務(wù)釋放資源。這樣可以有效地避免資源過度使用和競爭,保證系統(tǒng)的穩(wěn)定性和性能。
4.3 代碼實現(xiàn)
4.3.1 結(jié)構(gòu)體定義
Weighted的結(jié)構(gòu)體定義如下:
type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List }
- size:表示資源的總數(shù)量,即可以同時獲取的最大資源數(shù)量。
- cur:表示當前已經(jīng)被獲取的資源數(shù)量。
- mu:用于保護Weighted類型的互斥鎖,確保并發(fā)安全性。
- waiters:使用雙向鏈表來存儲等待獲取資源的任務(wù)。
4.3.2 Acquire方法
Acquire方法將獲取指定數(shù)量的資源。如果當前可用資源數(shù)量不足,調(diào)用此方法的任務(wù)將被阻塞,并加入到等待隊列中。
func (s *Weighted) Acquire(ctx context.Context, n int64) error { // 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保并發(fā)安全性。 s.mu.Lock() // size - cur 代表剩余可用資源數(shù),如果大于請求資源數(shù)n, 此時代表剩余可用資源 大于 需要的資源數(shù) // 其次,Weighted資源分配的順序是FIFO,如果等待隊列不為空,當前請求就需要自動放到隊列最后面 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } // s.size 代表最大資源數(shù),如果需要的資源數(shù) 大于 最大資源數(shù),此時直接返回錯誤 if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() } // 這里代表著當前暫時獲取不到資源,此時將創(chuàng)建一個waiter對象放到等待隊列最后 ready := make(chan struct{}) // waiter對象中包含需要獲取的資源數(shù)量n和通知通道ready。 w := waiter{n: n, ready: ready} // 將waiter對象放到隊列最后 elem := s.waiters.PushBack(w) // 釋放鎖,讓其他請求進來 s.mu.Unlock() select { // 如果ctx.Done()通道被關(guān)閉,表示上下文已取消,任務(wù)需要返回錯誤。 case <-ctx.Done(): err := ctx.Err() // 新獲取鎖,檢查是否已經(jīng)成功獲取資源。如果成功獲取資源,將錯誤置為nil,表示獲取成功; s.mu.Lock() select { // 通過判斷ready channel是否接收到信號,從而來判斷是否成功獲取資源 case <-ready: err = nil default: // 判斷是否是等待隊列中第一個元素 isFront := s.waiters.Front() == elem // 將該請求從等待隊列中移除 s.waiters.Remove(elem) // 如果是第一個等待對象,同時還有剩余資源,喚醒后面的waiter。說不定后面的waiter剛好符合條件 if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err // ready通道接收到數(shù)據(jù),代表此時已經(jīng)成功占據(jù)到資源了 case <-ready: return nil } }
Weighted對象用來控制可用資源的數(shù)量。它有兩個重要的字段,cur和size,分別表示當前可用的資源數(shù)量和總共可用的資源數(shù)量。
當一個請求通過Acquire方法請求資源時,首先會檢查剩余資源數(shù)量是否足夠,并且等待隊列中沒有其他請求在等待資源。如果滿足這兩個條件,請求就可以成功獲取到資源。
如果剩余資源數(shù)量不足以滿足請求,那么一個waiter的對象會被創(chuàng)建并放入等待隊列中。waiter對象包含了請求需要的資源數(shù)量n和一個用于通知的通道ready。當其他請求調(diào)用Release方法釋放資源時,它們會檢查等待隊列中的waiter對象是否滿足資源需求,如果滿足,就會將資源分配給該waiter對象,并通過ready通道來通知它可以執(zhí)行業(yè)務(wù)邏輯了。
即使剩余資源數(shù)量大于請求所需數(shù)量,如果等待隊列中存在等待的請求,新的請求也會被放入等待隊列中,而不管資源是否足夠。這可能導致一些請求長時間等待資源,導致資源的浪費和延遲。因此,在使用Weighted進行資源控制時,需要謹慎評估資源配額,并避免資源饑餓的情況發(fā)生,以免影響系統(tǒng)的性能和響應(yīng)能力。
4.3.3 Release方法
Release方法將釋放指定數(shù)量的資源。當資源被釋放時,會檢查等待隊列中的任務(wù)。它從隊頭開始逐個檢查等待的元素,并嘗試為它們分配資源,直到最后一個不滿足資源條件的元素為止。
func (s *Weighted) Release(n int64) { // 1. 使用互斥鎖s.mu對Weighted類型進行加鎖,確保并發(fā)安全性。 s.mu.Lock() // 2. 釋放資源 s.cur -= n // 3. 異常情況處理 if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } // 4. 喚醒等待任務(wù) s.notifyWaiters() s.mu.Unlock() }
可以看到,Release方法實現(xiàn)相對比較簡單,釋放資源后,便直接調(diào)用notifyWaiters方法喚醒處于等待狀態(tài)的任務(wù)。下面來看看notifyWaiters方法的具體實現(xiàn):
func (s *Weighted) notifyWaiters() { for { // 獲取隊頭元素 next := s.waiters.Front() // 已經(jīng)沒有處于等待狀態(tài)的協(xié)程,此時直接返回 if next == nil { break // No more waiters blocked. } w := next.Value.(waiter) // 如果資源不滿足要求 當前waiter的要求,此時直接返回 if s.size-s.cur < w.n { break } // 否則占據(jù)waiter需要的資源數(shù) s.cur += w.n // 移除等待元素 s.waiters.Remove(next) // 喚醒處于等待狀態(tài)的任務(wù),Acquire方法會 <- ready 來等待信號的到來 close(w.ready) } }
notifyWaiters方法會從隊頭開始獲取元素,判斷當前資源的剩余數(shù),是否滿足waiter的要求,如果滿足的話,此時先占據(jù)該waiter需要的資源,之后再將其從等待隊列中移除,最后調(diào)用close方法,喚醒處于等待狀態(tài)的任務(wù)。 之后,再繼續(xù)隊列中取出元素,判斷是否滿足條件,循環(huán)反復,直到不滿足waiter的條件為止。
4.3.4 TryAcquire方法
TryAcquire方法將嘗試獲取指定數(shù)量的資源,但不會阻塞。如果可用資源不足,它會立即返回一個錯誤,而不是阻塞等待。實現(xiàn)比較簡單,只是簡單檢查當前資源數(shù)是否滿足要求而已,具體如下:
func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
5. 注意事項
5.1 及時釋放資源
當使用Weighted來管理資源時,確保在使用完資源后,及時調(diào)用Release方法釋放資源。如果不這樣做,將會導致資源泄漏,最終導致所有的請求都將無法被處理。下面展示一個簡單的代碼說明:
package main import ( "fmt" "sync" "time" "golang.org/x/sync/semaphore" ) func main() { sem := semaphore.NewWeighted(5) // 創(chuàng)建一個最大并發(fā)數(shù)為5的Weighted實例 // 模擬使用資源的任務(wù) task := func(id int) { //1. 成功獲取資源 if err := sem.Acquire(context.Background(), 1); err != nil { fmt.Printf("Task %d failed to acquire resource: %s\n", id, err) return } // 2. 任務(wù)處理完成之后,資源沒有被釋放 // defer sem.Release(1) // 使用defer確保在任務(wù)完成后釋放資源 } // 啟動多個任務(wù)并發(fā)執(zhí)行 var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() task(id) }(i) } wg.Wait() // 等待所有任務(wù)完成 }
在上面的代碼中,我們使用Weighted來控制最大并發(fā)數(shù)為5。我們在任務(wù)中沒有調(diào)用sem.Release(1)釋放資源,這些資源將一直被占用,后面啟動的5個任務(wù)將永遠無法獲取到資源,此時將永遠不會繼續(xù)執(zhí)行下去。因此,務(wù)必在使用完資源后及時調(diào)用Release方法釋放資源,以確保資源的正確回收和釋放,保證系統(tǒng)的穩(wěn)定性和性能。
而且這里最好使用defer語句來實現(xiàn)資源的釋放,避免Release函數(shù)在某些異常場景下無法被執(zhí)行到。
5.2 合理設(shè)置并發(fā)數(shù)
Weighted只是提供了一種管理資源的手段,具體的并發(fā)數(shù)還需要開發(fā)人員自行根據(jù)系統(tǒng)的實際需求和資源限制,合理設(shè)置Weighted實例的最大并發(fā)數(shù)。過大的并發(fā)數(shù)可能導致資源過度競爭,而過小的并發(fā)數(shù)可能限制了系統(tǒng)的吞吐量。
具體操作可以到線上預發(fā)布環(huán)境,不斷調(diào)整觀察,獲取到一個最合適的并發(fā)數(shù)。
5.3 考慮Weighted是否適用于當前場景
Weighted 類型可以用于限制并發(fā)訪問資源的數(shù)量,但它也存在一些潛在的缺點,需要根據(jù)具體的應(yīng)用場景和需求權(quán)衡利弊。
首先是內(nèi)存開銷,Weighted 類型使用一個 sync.Mutex 以及一個 list.List 來管理等待隊列,這可能會占用一定的內(nèi)存開銷。對于大規(guī)模的并發(fā)處理,特別是在限制極高的情況下,可能會影響系統(tǒng)的內(nèi)存消耗。
其次是Weighted 類型一旦初始化,最大并發(fā)數(shù)是固定的,無法在運行時動態(tài)調(diào)整。如果你的應(yīng)用程序需要根據(jù)負載情況動態(tài)調(diào)整并發(fā)限制,可能需要使用其他機制或?qū)崿F(xiàn)。
而且Weighted是嚴格按照FIFO請求順序來分配資源的,當某些請求的權(quán)重過大時,可能會導致其他請求饑餓,即長時間等待資源。
最后,則是由于 Weighted 類型使用了互斥鎖來保護共享狀態(tài),因此在高并發(fā)情況下,爭奪鎖可能成為性能瓶頸,影響系統(tǒng)的吞吐量。
因此,在使用 Weighted 類型時,需要根據(jù)具體的應(yīng)用場景和需求權(quán)衡利弊,從而來決定是否使用Weighted來實現(xiàn)資源的管理控制。
6. 總結(jié)
本文介紹了一種解決系統(tǒng)中資源管理問題的解決方案Weighted。本文從問題引出,詳細介紹了Weighted的特點和使用方法。通過了解Weighted的設(shè)計初衷和實現(xiàn)原理,讀者可以更好地理解其工作原理。
同時,文章提供了使用Weighted時需要注意的事項,如及時釋放資源、合理設(shè)置并發(fā)數(shù)等,從而幫助讀者避免潛在的問題,以及能夠在比較合適的場景下使用到Weighted類型實現(xiàn)資源管理?;诖?,我們完成了對Weighted的介紹,希望對你有所幫助。
到此這篇關(guān)于Go使用Weighted實現(xiàn)資源管理的文章就介紹到這了,更多相關(guān)Go實現(xiàn)資源管理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言中三種容器類型的數(shù)據(jù)結(jié)構(gòu)詳解
在?Go?語言中,有三種主要的容器類型用于存儲和操作集合數(shù)據(jù)這篇文章主要為大家介紹了三者的使用與區(qū)別,感興趣的小伙伴可以跟隨小編一起學習一下2025-02-02go?tool?pprof?參數(shù)?'-base'?和?'-diff_base&
這篇文章主要介紹了go?tool?pprof?參數(shù)?'-base'?和?'-diff_base'之間的區(qū)別,兩個參數(shù)都是用于計算當前?profile文件減去基準profile文件所獲得的差值,用這個差值生成一個新的profile文件,本文給大家介紹的非常詳細,需要的朋友可以參考下2023-05-05golang?防緩存擊穿singleflight的實現(xiàn)
本文主要介紹了golang?防緩存擊穿singleflight的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-08-08在Golang中使用http.FileServer返回靜態(tài)文件的操作
這篇文章主要介紹了在Golang中使用http.FileServer返回靜態(tài)文件的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12