深入多線程之:深入生產(chǎn)者、消費(fèi)者隊(duì)列分析
上次我們使用AutoResetEvent實(shí)現(xiàn)了一個(gè)生產(chǎn)/消費(fèi)者隊(duì)列。這一次我們要使用Wait和Pulse方法來實(shí)現(xiàn)一個(gè)更強(qiáng)大的版本,它允許多個(gè)消費(fèi)者,每一個(gè)消費(fèi)者都在自己的線程中運(yùn)行。
我們使用數(shù)組來跟蹤線程。
Thread[] _workers;
通過跟蹤線程可以讓我們?cè)谒械木€程都結(jié)束后再結(jié)束我們的隊(duì)列任務(wù)。
每一個(gè)消費(fèi)者線程都執(zhí)行一個(gè)叫做Consume的方法,在一個(gè)for循環(huán)中,我們可以創(chuàng)建和啟動(dòng)線程。例如:
public PCQueue(int workerCount)
{
_workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
(_workers[i] = new Thread(Consume)).Start();
}
上次我們使用的是一個(gè)字符串來代表任務(wù),這次我們使用Action委托,它的定義如下:
Public delegate void Action();
為了表示一系列的任務(wù),我們使用Queue<T> 集合,例如:
Queue<Action> _itemQ = new Queue<Action>();
在我們調(diào)用生產(chǎn)(EnqueueItem)和消費(fèi)(Consume)方法前,還是完整的看一看代碼吧:
class PCQueue
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action> _itemQ = new Queue<Action>(); //保存任務(wù)的隊(duì)列
public PCQueue(int workerCount)
{
_workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
(_workers[i] = new Thread(Consume)).Start();
}
public void Shutdown(bool waitForWorkers)
{
//為每一個(gè)線程插入一個(gè)null item,可以是每一個(gè)worker 退出
foreach (Thread worker in _workers)
EnqueueItem(null);
//等待所有的線程退出。
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action item)
{
lock (_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker); //通知等待隊(duì)列中的線程
}
}
void Consume()
{
while (true)
{
Action item;
lock (_locker)
{
while (_itemQ.Count == 0)
{
Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號(hào)。 }
item = _itemQ.Dequeue();
}
if (item == null) return; //退出的信號(hào)
item();
}
}
}
我們可以有一個(gè)退出策略,插入一個(gè)null item作為consumer退出的信號(hào)。如果我們想要快速的退出,可以使用一個(gè)獨(dú)立的”cancel” 標(biāo)記,因?yàn)槲覀冎С侄鄠€(gè)consumers,所以我們必須為每一個(gè)consumer插入一個(gè)null item。
下面是Main方法。使用兩個(gè)consumer線程,然后讓這兩個(gè)consumers執(zhí)行10個(gè)委托。
public static void Main()
{
PCQueue q = new PCQueue(2);
Console.WriteLine("Enqueuing 10 items...");
for (int i = 0; i < 10; i++)
{
int itemNumber = i;
q.EnqueueItem(() =>
{
Thread.Sleep(1000); //模擬耗時(shí)的工作
Console.WriteLine(" Task " + itemNumber);
});
}
q.Shutdown(true); //等待關(guān)閉
Console.WriteLine();
Console.WriteLine("Workers complete!");
}
下面讓我們細(xì)致的看一看EnqueueItem方法:
public void EnqueueItem(Action item)
{
lock (_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker); //通知等待隊(duì)列中的線程
}
}
因?yàn)槲覀兊年?duì)列_itemQ被多線程環(huán)境使用,因此在對(duì)_itemQ進(jìn)行讀取的時(shí)候需要加鎖lock.
因?yàn)槲覀儾迦肓艘粋€(gè)新的任務(wù),我們必須修改阻塞條件,也就是調(diào)用pulse方法,來喚醒調(diào)用了wait方法的線程。
出于對(duì)效率的考慮,當(dāng)插入一個(gè)Item的時(shí)候使用Pulse來代替PulseAll方法,因?yàn)榇蟛糠謺r(shí)候每一個(gè)Item只需要一個(gè)consumer來執(zhí)行。如果你有一個(gè)冰淇淋,你不可能叫30個(gè)睡眠的孩子都起來吃它,同樣,對(duì)于一個(gè)item,同時(shí)喚醒30個(gè)consumers一點(diǎn)好處都沒有。
讓我們?cè)倏纯碈onsumer方法。
我們希望當(dāng)沒什么事情做的時(shí)候,線程阻塞就可以了,換句話說,隊(duì)列中沒有item的時(shí)候,線程就應(yīng)該阻塞。因此我們的阻塞條件是_itemQ.Count ==0;
Action item;
lock (_locker)
{
while (_itemQ.Count == 0)
{
Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號(hào)。 }
item = _itemQ.Dequeue();
}
if (item == null) return; //退出的信號(hào)
item();
while循環(huán)退出的時(shí)候也意味著_itemQ 至少有一個(gè)item。我們必須在釋放鎖之前調(diào)用你哦個(gè)dequeue方法來獲取item,考慮下下面的代碼:
lock (_locker)
{
while (_itemQ.Count == 0)
{
Monitor.Wait(_locker); //釋放鎖,并阻止當(dāng)前線程,直到其他線程發(fā)送pulse信號(hào)。 }
}
//現(xiàn)在在這里可能被搶占,_itemQ可能被修改
lock (_locker)
{
item = _itemQ.Dequeue();
}
在item被Dequeued后,我們就應(yīng)該立即釋放鎖了,如果我們?cè)趫?zhí)行task的時(shí)候,一直持有鎖,則會(huì)沒有必要的阻塞其他線程來獲取任務(wù)。
Wait Timeouts
在調(diào)用Wait方法的時(shí)候可以傳遞一個(gè)毫秒或Timespan的時(shí)間來設(shè)置超時(shí)。如果Wait超時(shí)了,那么Wait方法就會(huì)返回false。
帶有超時(shí)功能的Wait方法的主要步驟:
釋放鎖。
阻塞 直到 pulsed 或者超時(shí)。
重新獲取鎖。
超時(shí)就好像CLR 在超時(shí)到了的時(shí)候自動(dòng)的調(diào)用了 pulse方法一樣。
下面是使用超時(shí)的Wait的主要代碼:
lock(_locker)
while(<阻塞條件>)
Monitor.Wait(_locker,<超時(shí)時(shí)間>);
Monitor.Wait 方法返回一個(gè)bool值來代表是調(diào)用了pulse還是已經(jīng)超時(shí)了。
如果是true: 代表調(diào)用了pulse。
如果是false:代表超時(shí)了。
這對(duì)記錄日志很有用。
相關(guān)文章
C#數(shù)據(jù)結(jié)構(gòu)與算法揭秘三 鏈表
這節(jié)我們討論鏈表的基本操作,并且畫圖以證明,下屆中我們將討論雙向鏈表,環(huán)形鏈表 應(yīng)用舉例2012-11-11C#畫筆使用復(fù)合數(shù)組繪制單個(gè)矩形的方法
這篇文章主要介紹了C#畫筆使用復(fù)合數(shù)組繪制單個(gè)矩形的方法,涉及C#使用畫筆繪制圖形的相關(guān)技巧,需要的朋友可以參考下2015-06-06c# 在windows服務(wù)中 使用定時(shí)器實(shí)例代碼
這篇文章主要介紹了c# 在windows服務(wù)中 使用定時(shí)器實(shí)例代碼,有需要的朋友可以參考一下2013-12-12unity實(shí)現(xiàn)場(chǎng)景切換進(jìn)度條顯示
這篇文章主要為大家詳細(xì)介紹了unity實(shí)現(xiàn)場(chǎng)景切換進(jìn)度條顯示,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-11-11C#使用游標(biāo)實(shí)現(xiàn)補(bǔ)間函數(shù)
這篇文章主要為大家詳細(xì)介紹了C#使用游標(biāo)實(shí)現(xiàn)補(bǔ)間函數(shù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02