C#實(shí)現(xiàn)控制線程池最大數(shù)并發(fā)線程
1. 實(shí)驗(yàn)?zāi)康模?/strong>
使用線程池的時(shí)候,有時(shí)候需要考慮服務(wù)器的最大線程數(shù)目和程序最快執(zhí)行所有業(yè)務(wù)邏輯的取舍。
并非邏輯線程越多也好,而且新的邏輯線程必須會(huì)在線程池的等待隊(duì)列中等待 ,直到線程池中工作的線程執(zhí)行完畢,
才會(huì)有系統(tǒng)線程取出等待隊(duì)列中的邏輯線程,進(jìn)行CPU運(yùn)算。
2. 解決問(wèn)題:
<a>如果不考慮服務(wù)器實(shí)際可支持的最大并行線程個(gè)數(shù),程序不停往線程池申請(qǐng)新的邏輯線程,這個(gè)時(shí)候我們可以發(fā)現(xiàn)CPU的使用率會(huì)不斷飆升,并且內(nèi)存、網(wǎng)絡(luò)帶寬占用也會(huì)隨著邏輯線程在CPU隊(duì)列中堆積,而不斷增大。
<b>如果我們想在主程序有200個(gè)http網(wǎng)絡(luò)通訊需要執(zhí)行,如何每次循環(huán)用10個(gè)線程并發(fā)處理10個(gè)網(wǎng)絡(luò)http通訊回話,下一次循環(huán)只有在上一次循環(huán)的10個(gè)線程都執(zhí)行完畢后才會(huì)執(zhí)行下一次循環(huán),并且主程序監(jiān)聽(tīng)和等待200個(gè)http網(wǎng)絡(luò)通訊都在CPU線程池中執(zhí)行完畢后,才會(huì)退出主程序。
3. 實(shí)現(xiàn)邏輯:
我們通過(guò)兩個(gè)AutoResetEvent和線程監(jiān)聽(tīng)器Monitor,分別實(shí)現(xiàn):
<a>wait_sync: 任務(wù)線程的 并發(fā)執(zhí)行,每次循環(huán)只處理最大10個(gè)線程分別對(duì)網(wǎng)絡(luò)做http通訊回話。并且當(dāng)前循環(huán)的10個(gè)線程都執(zhí)行完畢后,才會(huì)進(jìn)行下一次循環(huán)處理。
<b> wait_main: 主程序線程的監(jiān)聽(tīng)和等待,只有所有任務(wù)線程都執(zhí)行完畢后,主程序線程才會(huì)退出程序。
<c> list_Thread: 負(fù)責(zé)記錄每次循環(huán),CPU實(shí)際分配的系統(tǒng)線程的個(gè)數(shù)。和Monitor配合使用,Monitor.Enter(list_Thread)=占用共享線程資源的占用鎖,Monitor.Exit(list_Thread)釋放共享線程資源的占用鎖。
<d> n_total_thread: 配合wait_main使用,記錄全部邏輯線程,已經(jīng)執(zhí)行完畢的當(dāng)前總個(gè)數(shù),用來(lái)判斷主線程是否還需要繼續(xù)等待,還是可以結(jié)束主程序運(yùn)行。
4. 主要代碼:
<a> 線程池控制代碼,如下:
/// <summary> /// 多線程調(diào)用WCF /// </summary> /// <param name="select">調(diào)用WCF的方式,1=Restful,2=Tcp</param> /// <param name="num"></param> static void DoTest_MultiThread(string select, long num) { int n_max_thread = 10; // 設(shè)置并行最大為10個(gè)線程 int n_total_thread = 0; // 用來(lái)控制:主程序的結(jié)束執(zhí)行,當(dāng)所有任務(wù)線程執(zhí)行完畢 ILog log_add = new LogHelper("Add_Thread"); ILog log_del = new LogHelper("Del_Thread"); ILog log_wait = new LogHelper("Wait_Thread"); ILog log_set = new LogHelper("Set_Thread"); ILog log_for = new LogHelper("For_Thread"); Console.Title = string.Format("調(diào)用WCF的方式 => {0}, 調(diào)用次數(shù)=> {1}" , select == "1" ? "Restful" : "Socket" , num); List<int> list_Thread = new List<int>(); System.Threading.AutoResetEvent wait_sync = new System.Threading.AutoResetEvent(false); // 用來(lái)控制:并發(fā)最大個(gè)數(shù)線程=n_max_thread System.Threading.AutoResetEvent wait_main = new System.Threading.AutoResetEvent(false); // 用來(lái)控制:主程序的結(jié)束執(zhí)行,當(dāng)所有任務(wù)線程執(zhí)行完畢 DateTime date_step = DateTime.Now; for (long i = 0; i < num; i++) { Num_Query_Static++; if (i >0 && (i+1-1) % n_max_thread == 0) // -1 表示第max個(gè)線程尚未開(kāi)始 { //log_wait.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); wait_sync.WaitOne(); // 每次并發(fā)10個(gè)線程,等待處理完畢后,在發(fā)送下一次并發(fā)線程 } log_for.Info(string.Format("thread n= {0},for i= {1}", list_Thread.Count, i + 1)); System.Threading.ThreadPool.QueueUserWorkItem ((data) => { int id = System.Threading.Thread.CurrentThread.ManagedThreadId; System.Threading.Monitor.Enter(list_Thread); list_Thread.Add(id); System.Threading.Monitor.Exit(list_Thread); log_add.Info(string.Format("id={0}, count={1}", id, list_Thread.Count)); // 日志 if (select == "1") // Restful方式調(diào)用 { Query_Htty(); } else { Query_Socket(); } n_total_thread += 1; if (list_Thread.Count == (n_max_thread) || n_total_thread == num) { list_Thread.Clear(); //log_set.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); //wait_sync.Set(); if (n_total_thread != num) { wait_sync.Set(); // 任務(wù)線程,繼續(xù)執(zhí)行 } else { wait_main.Set(); // 主程序線程,繼續(xù)執(zhí)行 } } }, list_Thread); } wait_main.WaitOne(); Console.WriteLine(string.Format("總測(cè)試{0}次,總耗時(shí){1}, 平均耗時(shí){2}" , num , (DateTime.Now - date_step).ToString() , (DateTime.Now - date_step).TotalMilliseconds / num)); Query_Thread(); }
<b> WCF后臺(tái)服務(wù)代碼
private static ILog log = new LogHelper("SeqService"); // 日志 private static Dictionary<int, DateTime> dic_thread = new Dictionary<int, DateTime>(); // 線程列表 private static long Num = 0; // 線程個(gè)數(shù) private static object lock_Num = 0; // 共享數(shù)據(jù)-鎖 /// <summary> /// 在線申請(qǐng)流水號(hào) /// </summary> /// <returns></returns> [WebGet(UriTemplate = "GetSeqNum/Json", ResponseFormat = WebMessageFormat.Json)] public string GetSeqNumber() { lock (lock_Num) { Num++; int id_thread = System.Threading.Thread.CurrentThread.ManagedThreadId; DateTime now = DateTime.Now; if (!dic_thread.TryGetValue(id_thread, out now)) { dic_thread.Add(id_thread, DateTime.Now); } } string ret = DateTime.Now.ToString("yyyyMMdd") + Num.ToString(new string('0', 9)); log.Info(string.Format("{0}, Thread={1}/{2}", ret, System.Threading.Thread.CurrentThread.ManagedThreadId, dic_thread.Count)); return ret; }
5. 實(shí)驗(yàn)結(jié)果
1. 10000個(gè)WCF網(wǎng)絡(luò)http請(qǐng)求,CPU分成每次10個(gè)(10可以按需求調(diào)整)線程并發(fā)執(zhí)行,并且主程序在所有請(qǐng)求都執(zhí)行完畢后,才退出主程序。
1. 前端日志:LogFile\Add_Thread\Info
2. WCF日志:LogFile\SeqService\Info
相關(guān)文章
C#中使用快速排序按文件創(chuàng)建時(shí)間將文件排序的源碼
C#中使用快速排序按文件創(chuàng)建時(shí)間將文件排序的源碼...2007-03-03C#使用DirectX.DirectSound播放語(yǔ)音
這篇文章主要為大家詳細(xì)介紹了C#使用DirectX.DirectSound播放語(yǔ)音,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03c#中Empty()和DefalutIfEmpty()用法分析
這篇文章主要介紹了c#中Empty()和DefalutIfEmpty()用法,以實(shí)例形式分析了針對(duì)不同情況下Empty()和DefalutIfEmpty()用法區(qū)別,需要的朋友可以參考下2014-11-11C# Redis學(xué)習(xí)系列(二)Redis基本設(shè)置
這篇文章主要為大家分享了C# Redis學(xué)習(xí)系列教程第二篇, Redis基本設(shè)置,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05