C#實現(xiàn)控制線程池最大數(shù)并發(fā)線程
1. 實驗目的:
使用線程池的時候,有時候需要考慮服務器的最大線程數(shù)目和程序最快執(zhí)行所有業(yè)務邏輯的取舍。
并非邏輯線程越多也好,而且新的邏輯線程必須會在線程池的等待隊列中等待 ,直到線程池中工作的線程執(zhí)行完畢,
才會有系統(tǒng)線程取出等待隊列中的邏輯線程,進行CPU運算。
2. 解決問題:
<a>如果不考慮服務器實際可支持的最大并行線程個數(shù),程序不停往線程池申請新的邏輯線程,這個時候我們可以發(fā)現(xiàn)CPU的使用率會不斷飆升,并且內(nèi)存、網(wǎng)絡帶寬占用也會隨著邏輯線程在CPU隊列中堆積,而不斷增大。
<b>如果我們想在主程序有200個http網(wǎng)絡通訊需要執(zhí)行,如何每次循環(huán)用10個線程并發(fā)處理10個網(wǎng)絡http通訊回話,下一次循環(huán)只有在上一次循環(huán)的10個線程都執(zhí)行完畢后才會執(zhí)行下一次循環(huán),并且主程序監(jiān)聽和等待200個http網(wǎng)絡通訊都在CPU線程池中執(zhí)行完畢后,才會退出主程序。
3. 實現(xiàn)邏輯:
我們通過兩個AutoResetEvent和線程監(jiān)聽器Monitor,分別實現(xiàn):
<a>wait_sync: 任務線程的 并發(fā)執(zhí)行,每次循環(huán)只處理最大10個線程分別對網(wǎng)絡做http通訊回話。并且當前循環(huán)的10個線程都執(zhí)行完畢后,才會進行下一次循環(huán)處理。
<b> wait_main: 主程序線程的監(jiān)聽和等待,只有所有任務線程都執(zhí)行完畢后,主程序線程才會退出程序。
<c> list_Thread: 負責記錄每次循環(huán),CPU實際分配的系統(tǒng)線程的個數(shù)。和Monitor配合使用,Monitor.Enter(list_Thread)=占用共享線程資源的占用鎖,Monitor.Exit(list_Thread)釋放共享線程資源的占用鎖。
<d> n_total_thread: 配合wait_main使用,記錄全部邏輯線程,已經(jīng)執(zhí)行完畢的當前總個數(shù),用來判斷主線程是否還需要繼續(xù)等待,還是可以結(jié)束主程序運行。
4. 主要代碼:
<a> 線程池控制代碼,如下:
/// <summary> /// 多線程調(diào)用WCF /// </summary> /// <param name="select">調(diào)用WCF的方式,1=Restful,2=Tcp</param> /// <param name="num"></param> static void DoTest_MultiThread(string select, long num) { int n_max_thread = 10; // 設置并行最大為10個線程 int n_total_thread = 0; // 用來控制:主程序的結(jié)束執(zhí)行,當所有任務線程執(zhí)行完畢 ILog log_add = new LogHelper("Add_Thread"); ILog log_del = new LogHelper("Del_Thread"); ILog log_wait = new LogHelper("Wait_Thread"); ILog log_set = new LogHelper("Set_Thread"); ILog log_for = new LogHelper("For_Thread"); Console.Title = string.Format("調(diào)用WCF的方式 => {0}, 調(diào)用次數(shù)=> {1}" , select == "1" ? "Restful" : "Socket" , num); List<int> list_Thread = new List<int>(); System.Threading.AutoResetEvent wait_sync = new System.Threading.AutoResetEvent(false); // 用來控制:并發(fā)最大個數(shù)線程=n_max_thread System.Threading.AutoResetEvent wait_main = new System.Threading.AutoResetEvent(false); // 用來控制:主程序的結(jié)束執(zhí)行,當所有任務線程執(zhí)行完畢 DateTime date_step = DateTime.Now; for (long i = 0; i < num; i++) { Num_Query_Static++; if (i >0 && (i+1-1) % n_max_thread == 0) // -1 表示第max個線程尚未開始 { //log_wait.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); wait_sync.WaitOne(); // 每次并發(fā)10個線程,等待處理完畢后,在發(fā)送下一次并發(fā)線程 } log_for.Info(string.Format("thread n= {0},for i= {1}", list_Thread.Count, i + 1)); System.Threading.ThreadPool.QueueUserWorkItem ((data) => { int id = System.Threading.Thread.CurrentThread.ManagedThreadId; System.Threading.Monitor.Enter(list_Thread); list_Thread.Add(id); System.Threading.Monitor.Exit(list_Thread); log_add.Info(string.Format("id={0}, count={1}", id, list_Thread.Count)); // 日志 if (select == "1") // Restful方式調(diào)用 { Query_Htty(); } else { Query_Socket(); } n_total_thread += 1; if (list_Thread.Count == (n_max_thread) || n_total_thread == num) { list_Thread.Clear(); //log_set.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); //wait_sync.Set(); if (n_total_thread != num) { wait_sync.Set(); // 任務線程,繼續(xù)執(zhí)行 } else { wait_main.Set(); // 主程序線程,繼續(xù)執(zhí)行 } } }, list_Thread); } wait_main.WaitOne(); Console.WriteLine(string.Format("總測試{0}次,總耗時{1}, 平均耗時{2}" , num , (DateTime.Now - date_step).ToString() , (DateTime.Now - date_step).TotalMilliseconds / num)); Query_Thread(); }
<b> WCF后臺服務代碼
private static ILog log = new LogHelper("SeqService"); // 日志 private static Dictionary<int, DateTime> dic_thread = new Dictionary<int, DateTime>(); // 線程列表 private static long Num = 0; // 線程個數(shù) private static object lock_Num = 0; // 共享數(shù)據(jù)-鎖 /// <summary> /// 在線申請流水號 /// </summary> /// <returns></returns> [WebGet(UriTemplate = "GetSeqNum/Json", ResponseFormat = WebMessageFormat.Json)] public string GetSeqNumber() { lock (lock_Num) { Num++; int id_thread = System.Threading.Thread.CurrentThread.ManagedThreadId; DateTime now = DateTime.Now; if (!dic_thread.TryGetValue(id_thread, out now)) { dic_thread.Add(id_thread, DateTime.Now); } } string ret = DateTime.Now.ToString("yyyyMMdd") + Num.ToString(new string('0', 9)); log.Info(string.Format("{0}, Thread={1}/{2}", ret, System.Threading.Thread.CurrentThread.ManagedThreadId, dic_thread.Count)); return ret; }
5. 實驗結(jié)果
1. 10000個WCF網(wǎng)絡http請求,CPU分成每次10個(10可以按需求調(diào)整)線程并發(fā)執(zhí)行,并且主程序在所有請求都執(zhí)行完畢后,才退出主程序。
1. 前端日志:LogFile\Add_Thread\Info
2. WCF日志:LogFile\SeqService\Info
相關文章
C#中使用快速排序按文件創(chuàng)建時間將文件排序的源碼
C#中使用快速排序按文件創(chuàng)建時間將文件排序的源碼...2007-03-03c#中Empty()和DefalutIfEmpty()用法分析
這篇文章主要介紹了c#中Empty()和DefalutIfEmpty()用法,以實例形式分析了針對不同情況下Empty()和DefalutIfEmpty()用法區(qū)別,需要的朋友可以參考下2014-11-11