Java?多線(xiàn)程并發(fā)編程提高數(shù)據(jù)處理效率的詳細(xì)過(guò)程
??工作場(chǎng)景中遇到這樣一個(gè)需求:根據(jù)主機(jī)的 IP 地址聯(lián)動(dòng)更新其他模型的相關(guān)信息。需求很簡(jiǎn)單,只涉及一般的數(shù)據(jù)庫(kù)聯(lián)動(dòng)查詢(xún)以及更新操作,然而在編碼實(shí)現(xiàn)過(guò)程中發(fā)現(xiàn),由于主機(jī)的數(shù)量很多,導(dǎo)致循環(huán)遍歷查詢(xún)、更新時(shí)花費(fèi)很長(zhǎng)的時(shí)間,調(diào)用一次接口大概需要 30-40 min 時(shí)間才能完成操作。
??因此,為了有效縮短接口方法的執(zhí)行時(shí)間,便考慮使用多線(xiàn)程并發(fā)編程方法,利用多核處理器并行執(zhí)行的能力,通過(guò)異步處理數(shù)據(jù)的方式,便可以大大縮短執(zhí)行時(shí)間,提高執(zhí)行效率。
??這里使用可重用固定線(xiàn)程數(shù)的線(xiàn)程池 FixedThreadPool
,并使用 CountDownLatch
并發(fā)工具類(lèi)提供的并發(fā)流程控制工具作為配合使用,保證多線(xiàn)程并發(fā)編程過(guò)程中的正常運(yùn)行:
- 首先,通過(guò)
Runtime.getRuntime().availableProcessors()
方法獲取運(yùn)行機(jī)器的 CPU 線(xiàn)程數(shù),用于后續(xù)設(shè)置固定線(xiàn)程池的線(xiàn)程數(shù)量。 - 其次,判斷任務(wù)的特性,如果為計(jì)算密集型任務(wù)則設(shè)置線(xiàn)程數(shù)為
CPU 線(xiàn)程數(shù)+1
,如果為 IO 密集型任務(wù)則設(shè)置線(xiàn)程數(shù)為2 * CPU 線(xiàn)程數(shù)
,由于在方法中需要與數(shù)據(jù)庫(kù)進(jìn)行頻繁的交互,因此屬于 IO 密集型任務(wù)。 - 之后,對(duì)數(shù)據(jù)進(jìn)行分組切割,每個(gè)線(xiàn)程處理一個(gè)分組的數(shù)據(jù),分組的組數(shù)與線(xiàn)程數(shù)保持一致,并且還要?jiǎng)?chuàng)建計(jì)數(shù)器對(duì)象
CountDownLatch
,調(diào)用構(gòu)造函數(shù),初始化參數(shù)值為線(xiàn)程數(shù)個(gè)數(shù),保證主線(xiàn)程等待所有子線(xiàn)程運(yùn)行結(jié)束后,再進(jìn)行后續(xù)的操作。 - 然后,調(diào)用
executorService.execute()
方法,重寫(xiě)run
方法編寫(xiě)業(yè)務(wù)邏輯與數(shù)據(jù)處理代碼,執(zhí)行完當(dāng)前線(xiàn)程后記得將計(jì)數(shù)器減1操作。 - 最后,當(dāng)所有子線(xiàn)程執(zhí)行完成后,關(guān)閉線(xiàn)程池。
?在省略工作場(chǎng)景中的業(yè)務(wù)邏輯代碼后,通用的處理方法示例如下所示:
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executorService.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executorService.shutdown(); return ResponseData.success(); }
??那么在使用多線(xiàn)程異步更新的策略后,從當(dāng)初調(diào)用接口所需的大概時(shí)間為 30-40 min
下降到了 8-10 min
,大大提高了執(zhí)行效率。
??需要注意的是,這里使用的
newFixedThreadPool
創(chuàng)建線(xiàn)程池,它有一個(gè)缺陷就是,它的阻塞隊(duì)列默認(rèn)是一個(gè)無(wú)界隊(duì)列,默認(rèn)值為Integer.MAX_VALUE
極有可能會(huì)造成 OOM 問(wèn)題。因此,一般可以使用ThreadPoolExecutor
來(lái)創(chuàng)建線(xiàn)程池,自己可以指定等待隊(duì)列中的線(xiàn)程個(gè)數(shù),避免產(chǎn)生 OOM 問(wèn)題。
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executor.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executor.shutdown(); return ResponseData.success(); }
在上述的代碼中,核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)分別為 5 和 8,并沒(méi)有設(shè)置的很大的值,因?yàn)槿绻绻O(shè)置的很大,線(xiàn)程間頻繁的上下文切換也會(huì)增加時(shí)間消耗,反而不能最大程度上發(fā)揮多線(xiàn)程的優(yōu)勢(shì)。至于如何選擇合適的參數(shù),需要根據(jù)機(jī)器的參數(shù)以及任務(wù)的類(lèi)型綜合考慮決定。
??最后補(bǔ)充一點(diǎn),如果想要通過(guò)非編碼的方式獲取機(jī)器的 CPU 線(xiàn)程個(gè)數(shù)也很簡(jiǎn)單,windows 系統(tǒng)通過(guò)任務(wù)管理器,選擇 “性能”,便可以查看 CPU 線(xiàn)程個(gè)數(shù)的情況,如下圖所示:
??從上圖可以看到,我的機(jī)器中內(nèi)核是八個(gè) CPU,但是通過(guò)超線(xiàn)程技術(shù)一個(gè)物理的 CPU 核心可以模擬成兩個(gè)邏輯 CPU 線(xiàn)程,因此我的機(jī)器是支持8核16線(xiàn)程的。
到此這篇關(guān)于Java 多線(xiàn)程并發(fā)編程提高數(shù)據(jù)處理效率的文章就介紹到這了,更多相關(guān)Java 多線(xiàn)程提高數(shù)據(jù)處理效率內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA8 List<List<Integer>> list中再裝一個(gè)list轉(zhuǎn)成一個(gè)list操
這篇文章主要介紹了JAVA8 List<List<Integer>> list中再裝一個(gè)list轉(zhuǎn)成一個(gè)list操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08maven 在執(zhí)行package,install,deploy時(shí)使用clean與不使用clean的不同之處
有時(shí)候用mvn install后,新改的內(nèi)容不生效,一定要后來(lái)使用mvn clean install 才生效,由于之前沒(méi)有做記錄,以及記不清是什么情況下才會(huì)出現(xiàn)的問(wèn)題,于是想看看clean和不clean的區(qū)別,感興趣的朋友跟隨小編一起看看吧2021-08-08spring?boot?validation參數(shù)校驗(yàn)與分組嵌套各種類(lèi)型及使用小結(jié)
參數(shù)校驗(yàn)基本上是controller必做的事情,畢竟前端傳過(guò)來(lái)的一切都不可信,validation可以簡(jiǎn)化這一操作,這篇文章主要介紹了spring?boot?validation參數(shù)校驗(yàn)分組嵌套各種類(lèi)型及使用小結(jié),需要的朋友可以參考下2023-09-09Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之樹(shù)
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之樹(shù),文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java數(shù)據(jù)結(jié)構(gòu)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-05-05java接口用戶(hù)上下文的設(shè)計(jì)與實(shí)現(xiàn)
這篇文章主要為大家介紹了接口用戶(hù)上下文的設(shè)計(jì)與實(shí)現(xiàn)實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11